Linux RDMA and InfiniBand development
 help / color / mirror / Atom feed
From: Md Haris Iqbal <haris.iqbal@ionos.com>
To: linux-block@vger.kernel.org, linux-rdma@vger.kernel.org
Cc: linux-kernel@vger.kernel.org, axboe@kernel.dk,
	bvanassche@acm.org, hch@lst.de, jgg@ziepe.ca, leon@kernel.org,
	jinpu.wang@ionos.com, Md Haris Iqbal <haris.iqbal@ionos.com>,
	Jia Li <jia.li@ionos.com>
Subject: [PATCH 05/13] RDMA/rmr: server: main functionality
Date: Tue,  5 May 2026 09:46:17 +0200	[thread overview]
Message-ID: <20260505074644.195453-6-haris.iqbal@ionos.com> (raw)
In-Reply-To: <20260505074644.195453-1-haris.iqbal@ionos.com>

Add the RMR server implementation:

  rmr-srv.c	server core: session handling, pool registration via
		rmr_srv_register(), incoming command and IO message
		processing, sync thread coordination and the
		rmr_srv_query()/rmr_srv_unregister() entry points used
		by upper-layer stores.
  rmr-srv-md.c	server-side metadata persistence: serialising the
		pool metadata (member ID, map version, mapped size,
		store state) and the dirty maps to the underlying
		store, plus the periodic md_sync delayed work.

The server interacts with an upper-layer store via the
struct rmr_srv_store_ops interface defined in rmr-srv.h, allowing
different store implementations (block device, file, ...) to plug
in without modifying RMR itself.

These files are not compiled until the modules are wired into the
build in a later patch in this series.

Signed-off-by: Md Haris Iqbal <haris.iqbal@ionos.com>
Signed-off-by: Jia Li <jia.li@ionos.com>
---
 drivers/infiniband/ulp/rmr/rmr-srv-md.c |  764 ++++++
 drivers/infiniband/ulp/rmr/rmr-srv.c    | 3306 +++++++++++++++++++++++
 2 files changed, 4070 insertions(+)
 create mode 100644 drivers/infiniband/ulp/rmr/rmr-srv-md.c
 create mode 100644 drivers/infiniband/ulp/rmr/rmr-srv.c

diff --git a/drivers/infiniband/ulp/rmr/rmr-srv-md.c b/drivers/infiniband/ulp/rmr/rmr-srv-md.c
new file mode 100644
index 000000000000..9dab71a810b8
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-srv-md.c
@@ -0,0 +1,764 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR) — server metadata subsystem
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include <linux/module.h>
+#include <linux/blkdev.h>
+
+#include "rmr-srv.h"
+#include "rmr-req.h"
+#include "rmr-clt.h"
+
+/**
+ * process_md_io() - Process medata IO message
+ *
+ * @pool:	the pool where requests go through
+ * @rtrs_op:	rtrs IO context
+ * @offset:	offset in bytes relative to rmr metadata.
+ * @len:	length of the buffer in bytes
+ * @flags:	indicates metadata IO options
+ * @buf:	pointer to metadata buffer
+ *
+ * Return:
+ *	0 on success
+ *
+ * Description:
+ *	All metadata IOs go through this function to submit requests to block device. The offset it
+ *	passes on is relative to bytes shifting on rmr medata which is composed of a header
+ *	structure for pool metadata, bitmap and last_io array.
+ */
+int process_md_io(struct rmr_pool *pool, struct rtrs_srv_op *rtrs_op, u32 offset, u32 len,
+			 unsigned long flags, void *buf)
+{
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_srv_req *req;
+	int err = 0;
+
+	srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	if (!percpu_ref_tryget_live(&pool->ids_inflight_ref)) {
+		err = -EIO;
+		goto no_put;
+	}
+
+	req = rmr_srv_md_req_create(srv_pool, rtrs_op, buf, offset, len, flags, rmr_srv_endreq);
+	if (IS_ERR(req)) {
+		pr_err("Failed to create rmr_req %pe\n", req);
+		err = PTR_ERR(req);
+		goto put_pool;
+	}
+
+	rmr_md_req_submit(req);
+	return 0;
+
+put_pool:
+	percpu_ref_put(&pool->ids_inflight_ref);
+no_put:
+	return err;
+}
+
+int rmr_srv_read_md(struct rmr_pool *pool, struct rtrs_srv_op *rtrs_op, u32 offset, u32 len,
+		    struct rmr_pool_md *pool_md_page)
+{
+	/* pool_md is pre-allocated */
+	return process_md_io(pool, rtrs_op, offset, len, RMR_OP_MD_READ, pool_md_page);
+}
+
+static int rmr_srv_load_last_io(struct rmr_srv_pool *srv_pool)
+{
+	void *buf;
+	u64 offset, len;
+	struct rmr_pool *pool = srv_pool->pool;
+	struct rmr_pool_md *pool_md = &pool->pool_md;
+	int err = 0;
+
+	if (!pool_md->queue_depth) {
+		pr_err("%s: pool %s has zero queue_depth\n",
+		       __func__, pool->poolname);
+		return -EINVAL;
+	}
+	offset = RMR_LAST_IO_OFFSET;
+	len = rmr_last_io_len(pool_md->queue_depth);
+
+	if (!srv_pool->last_io_idx) {
+		srv_pool->last_io_idx = kcalloc(pool_md->queue_depth,
+						sizeof(*srv_pool->last_io_idx), GFP_KERNEL);
+		if (!srv_pool->last_io_idx)
+			return -ENOMEM;
+	}
+
+	buf = kzalloc(len, GFP_KERNEL);
+	if (!buf) {
+		err = -ENOMEM;
+		return err;
+	}
+
+	err = rmr_srv_read_md(pool, NULL, offset, len, buf);
+	if (err) {
+		pr_err("%s: failed to read last_io buffer of len %lld at offset %lld\n",
+		       __func__, len, offset);
+		goto free_buf;
+	}
+	memcpy(srv_pool->last_io_idx, (rmr_id_t *)buf, len);
+
+free_buf:
+	kfree(buf);
+	return err;
+}
+
+/**
+ * rmr_srv_md_maps_sync - Sync dirty maps to persistent storage
+ *
+ * Description:
+ *	Writes maps in two passes to the map-related regions of the on-disk layout:
+ *
+ *	Pass 1 — hdr_region (single PAGE_SIZE write at RMR_MD_SIZE + last_io_len):
+ *	  Fills one rmr_map_cbuf_hdr slot per map_idx in [0:maps_cnt].
+ *	  The buffer is kzalloc'd, so slots beyond maps_cnt are zero.
+ *	  The entire PAGE_SIZE region is issued as a single I/O.
+ *
+ *	Pass 2 — maps_region (slp pages at computed offsets after hdr_region):
+ *	  Each map's data offset = map_region_offset + map_idx * per_map_size.
+ *	  pool->maps[0:maps_cnt] is always dense (no NULL gaps).
+ */
+void rmr_srv_md_maps_sync(struct rmr_pool *pool)
+{
+	struct rmr_map_cbuf_hdr *map_cbuf_hdr;
+	struct rmr_dirty_id_map *map = NULL;
+	u32 hdr_region_offset = rmr_bitmap_offset(pool->pool_md.queue_depth);
+	u32 map_region_offset = hdr_region_offset + RMR_MAP_BUF_HDR_SIZE;
+	u64 per_map_size = 0;
+	int err, lock_idx;
+	void *buf;
+	u8 map_idx;
+
+	buf = kzalloc(RMR_MAP_BUF_HDR_SIZE, GFP_KERNEL);
+	if (!buf)
+		return;
+
+	lock_idx = srcu_read_lock(&pool->map_srcu);
+
+	/* Fill the header region: one slot per active map */
+	for (map_idx = 0; map_idx < pool->maps_cnt; map_idx++) {
+		map = rcu_dereference(pool->maps[map_idx]);
+		if (WARN_ON(!map))
+			goto unlock;
+
+		map_cbuf_hdr = buf + map_idx * sizeof(struct rmr_map_cbuf_hdr);
+		map_cbuf_hdr->version = RMR_MAP_FORMAT_VER;
+		map_cbuf_hdr->member_id = map->member_id;
+		map_cbuf_hdr->no_of_chunks = map->no_of_chunks;
+		map_cbuf_hdr->no_of_flp = map->no_of_flp;
+		map_cbuf_hdr->no_of_slp_in_last_flp = map->no_of_slp_in_last_flp;
+		map_cbuf_hdr->no_of_chunk_in_last_slp = map->no_of_chunk_in_last_slp;
+		map_cbuf_hdr->total_slp = map->total_slp;
+		per_map_size = map->total_slp * PAGE_SIZE;
+	}
+
+	/* Write the entire header region as a single PAGE_SIZE I/O */
+	err = process_md_io(pool, NULL, hdr_region_offset,
+			PAGE_SIZE, RMR_OP_MD_WRITE, buf);
+	if (err) {
+		pr_warn("%s: failed to write header region at 0x%x: %d\n",
+			__func__, hdr_region_offset, err);
+		goto unlock;
+	}
+
+	if (WARN_ON(!per_map_size))
+		goto unlock;
+
+	/* Write each map's slp pages */
+	for (map_idx = 0; map_idx < pool->maps_cnt; map_idx++) {
+		u32 map_data_offset;
+		el_flp *flp_ptr;
+		u64 no_of_slps;
+		void *slp;
+		int i, j;
+
+		map = rcu_dereference(pool->maps[map_idx]);
+		if (WARN_ON(!map))
+			break;
+
+		map_data_offset = map_region_offset + map_idx * per_map_size;
+
+		for (i = 0; i < map->no_of_flp; i++) {
+			flp_ptr = (el_flp *)map->dirty_bitmap[i];
+
+			if (i == (map->no_of_flp - 1))
+				no_of_slps = map->no_of_slp_in_last_flp;
+			else
+				no_of_slps = NO_OF_SLP_PER_FLP;
+
+			for (j = 0; j < no_of_slps; j++, flp_ptr++) {
+				slp = (void *)(*flp_ptr);
+
+				err = process_md_io(pool, NULL, map_data_offset,
+						PAGE_SIZE, RMR_OP_MD_WRITE, slp);
+				if (err)
+					pr_warn("%s: failed to write map slp at 0x%x: %d\n",
+						__func__, map_data_offset, err);
+				map_data_offset += PAGE_SIZE;
+			}
+		}
+	}
+
+unlock:
+	srcu_read_unlock(&pool->map_srcu, lock_idx);
+	kfree(buf);
+}
+
+/**
+ * rmr_srv_refresh_md_maps - Restore maps from map buffers on disk
+ *
+ * Description:
+ *	Reads back the maps written by rmr_srv_md_maps_sync(). Reads the hdr_region
+ *	in a single I/O to obtain the per-map headers, then loads each present
+ *	map's slp pages from maps_region:
+ *	  data offset = map_region_offset + map_idx * per_map_size
+ *	Header slots 0..N-1 are active; remaining are zero (member_id == 0).
+ */
+static int rmr_srv_refresh_md_maps(struct rmr_srv_pool *srv_pool)
+{
+	struct rmr_pool *pool = srv_pool->pool;
+	struct rmr_map_cbuf_hdr *map_cbuf_hdr;
+	struct rmr_dirty_id_map *map = NULL;
+	u32 hdr_region_offset = rmr_bitmap_offset(pool->pool_md.queue_depth);
+	u32 map_region_offset = hdr_region_offset + RMR_MAP_BUF_HDR_SIZE;
+	int err = 0, lock_idx;
+	void *buf;
+	u8 map_idx, valid_nr = 0;
+	bool unpack;
+
+	buf = kzalloc(RMR_MAP_BUF_HDR_SIZE, GFP_KERNEL);
+	if (!buf)
+		return -ENOMEM;
+
+	/* Read the entire header region in a single PAGE_SIZE I/O */
+	err = rmr_srv_read_md(pool, NULL, hdr_region_offset, RMR_MAP_BUF_HDR_SIZE, buf);
+	if (err) {
+		pr_err("%s: failed to read header region at offset %u\n",
+				__func__, hdr_region_offset);
+		kfree(buf);
+		return err;
+	}
+
+	lock_idx = srcu_read_lock(&pool->map_srcu);
+	for (map_idx = 0; map_idx < RMR_POOL_MAX_SESS; map_idx++) {
+		u64 per_map_size;
+		u32 map_data_offset;
+		el_flp *flp_ptr;
+		u64 no_of_slps;
+		void *slp;
+		int i, j;
+
+		map_cbuf_hdr = buf + map_idx * sizeof(struct rmr_map_cbuf_hdr);
+		pr_debug("%s: %llu %u %llu %llu %llu %llu %llu\n", __func__,
+			map_cbuf_hdr->version,
+			map_cbuf_hdr->member_id,
+			map_cbuf_hdr->no_of_chunks,
+			map_cbuf_hdr->no_of_flp,
+			map_cbuf_hdr->no_of_slp_in_last_flp,
+			map_cbuf_hdr->no_of_chunk_in_last_slp,
+			map_cbuf_hdr->total_slp);
+
+		/* Empty slot: no more active maps beyond this point */
+		if (!map_cbuf_hdr->member_id)
+			break;
+		valid_nr++;
+
+		per_map_size = map_cbuf_hdr->total_slp * PAGE_SIZE;
+		map_data_offset = map_region_offset + map_idx * per_map_size;
+
+		unpack = false;
+		/*
+		 * The dirty map should be updated only when the one on disk is more updated.
+		 * Such cases are as follows.
+		 * 1) The dirty map does not exist in the pool. The map will be simply restored to
+		 * the last version we have.
+		 * 2) The dirty map of the pool is just created. If it has been updated, the one on
+		 * disk is outdated.
+		 */
+		map = rmr_pool_find_map(pool, map_cbuf_hdr->member_id);
+		if (!map) {
+			map = rmr_map_create(pool, map_cbuf_hdr->member_id);
+			if (IS_ERR(map)) {
+				err = PTR_ERR(map);
+				pr_err("%s: pool %s, member_id %d failed to create map\n",
+				       __func__, pool->poolname, map_cbuf_hdr->member_id);
+				goto unlock;
+			}
+			unpack = true;
+		} else if (rmr_map_empty(map)) {
+			unpack = true;
+		}
+
+		if (map->no_of_chunks != map_cbuf_hdr->no_of_chunks ||
+				map->no_of_flp != map_cbuf_hdr->no_of_flp ||
+				map->no_of_slp_in_last_flp != map_cbuf_hdr->no_of_slp_in_last_flp ||
+				map->no_of_chunk_in_last_slp != map_cbuf_hdr->no_of_chunk_in_last_slp ||
+				map->total_slp != map_cbuf_hdr->total_slp) {
+			pr_err("%s: Sanity check failed\n", __func__);
+			goto unlock;
+		}
+
+		xa_store(&pool->stg_members, map_cbuf_hdr->member_id, XA_TRUE, GFP_KERNEL);
+
+		if (!unpack)
+			continue;
+
+		for (i = 0; i < map->no_of_flp; i++) {
+			flp_ptr = (el_flp *)map->dirty_bitmap[i];
+
+			if (i == (map->no_of_flp - 1))
+				no_of_slps = map->no_of_slp_in_last_flp;
+			else
+				no_of_slps = NO_OF_SLP_PER_FLP;
+
+			for (j = 0; j < no_of_slps; j++, flp_ptr++) {
+				slp = (void *)(*flp_ptr);
+
+				err = rmr_srv_read_md(pool, NULL, map_data_offset,
+						PAGE_SIZE, slp);
+				if (err) {
+					pr_err("%s: failed to read bitmap at offset %u\n",
+						__func__, map_data_offset);
+					goto unlock;
+				}
+				map_data_offset += PAGE_SIZE;
+			}
+		}
+	}
+
+unlock:
+	if (!valid_nr)
+		pr_err("%s: no valid map found in metadata\n", __func__);
+
+	/*
+	 * TODO: We need better error handling logic here.
+	 * Lets suppose after successfully reading few pages for a map, we fail to read next page.
+	 * We then error out and fail the register, but leave the partially updated map in the pool.
+	 * Later when another register is called, and we come here to read the maps, we will
+	 * see a non-empty map, and skip reading the map from disk.
+	 */
+	srcu_read_unlock(&pool->map_srcu, lock_idx);
+	kfree(buf);
+	return err;
+}
+
+/**
+ * rmr_srv_md_update() - update the metadata of the server pool
+ *
+ * Description:
+ *	Read current in-memory pool states that changes to the srv_md of this pool.
+ */
+static int rmr_srv_md_update(struct rmr_srv_pool *srv_pool)
+{
+	struct rmr_pool *pool;
+	struct rmr_srv_md *my_srv_md;
+	int md_i;
+
+	pool = srv_pool->pool;
+	md_i = rmr_pool_find_md(&pool->pool_md, srv_pool->member_id, true);
+	if (md_i < 0) {
+		pr_warn("No space for new member %d.\n", srv_pool->member_id);
+		return -EINVAL;
+	}
+	my_srv_md = &pool->pool_md.srv_md[md_i];
+	my_srv_md->member_id = srv_pool->member_id;
+	my_srv_md->store_state = atomic_read(&srv_pool->store_state);
+	my_srv_md->map_ver = srv_pool->pool->map_ver;
+	my_srv_md->srv_pool_state = atomic_read(&srv_pool->state);
+	pr_debug("Set srv_md[%d] it with the member_id %d.\n", md_i, srv_pool->member_id);
+	return 0;
+}
+
+/**
+ * rmr_srv_flush_pool_md() - Write pool_md region to disk immediately
+ *
+ * @srv_pool:	Server pool whose pool_md is to be flushed
+ *
+ * Description:
+ *	Persist pool_md without waiting for the delayed work.
+ */
+void rmr_srv_flush_pool_md(struct rmr_srv_pool *srv_pool)
+{
+	struct rmr_pool *pool = srv_pool->pool;
+	void *buf;
+	int err;
+
+	if (!atomic_read(&srv_pool->store_state) || !pool->mapped_size)
+		return;
+
+	err = rmr_srv_md_update(srv_pool);
+	if (err) {
+		pr_warn("%s: failed to update pool_md before flush: 0x%x\n", __func__, err);
+		return;
+	}
+
+	buf = kzalloc(RMR_MD_SIZE, GFP_KERNEL);
+	if (!buf)
+		return;
+
+	memcpy(buf, &pool->pool_md, sizeof(struct rmr_pool_md));
+	err = process_md_io(pool, NULL, 0, RMR_MD_SIZE, RMR_OP_MD_WRITE, buf);
+	if (err)
+		pr_warn("%s: failed to flush pool_md: 0x%x at offset 0 len %lu\n",
+			__func__, err, RMR_MD_SIZE);
+	kfree(buf);
+}
+
+/**
+ * rmr_srv_flush_last_io() - Write last_io region to disk
+ *
+ * @srv_pool:	Server pool whose last_io is to be flushed
+ */
+static void rmr_srv_flush_last_io(struct rmr_srv_pool *srv_pool)
+{
+	struct rmr_pool *pool = srv_pool->pool;
+	u64 last_io_len = rmr_last_io_len(pool->pool_md.queue_depth);
+	void *buf;
+	int err;
+
+	if (!last_io_len || !srv_pool->last_io)
+		return;
+
+	buf = kzalloc(last_io_len, GFP_KERNEL);
+	if (!buf)
+		return;
+
+	memcpy(srv_pool->last_io_idx, srv_pool->last_io, last_io_len);
+	memcpy(buf, srv_pool->last_io_idx, last_io_len);
+
+	err = process_md_io(pool, NULL, RMR_MD_SIZE, last_io_len,
+			    RMR_OP_MD_WRITE, buf);
+	if (err)
+		pr_warn("%s: failed to flush last_io: 0x%x at offset %lu len %llu\n",
+			__func__, err, RMR_MD_SIZE, last_io_len);
+	kfree(buf);
+}
+
+/**
+ * rmr_srv_md_load_buf() - Load the server metadata from buffer to the server pool.
+ *
+ * Description:
+ *	This function loads the server-side metadata from buffer to the pool. The buffer must be
+ *	in the format of rmr pool metadata structure, which may contain updated srv_md of
+ *	multiple servers.
+ */
+static int rmr_srv_md_load_buf(struct rmr_pool *pool, void *buf)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_srv_md *srv_md_buf;
+	u8 member_id = 0;
+	int err = 0, index, i;
+	bool ret = false;
+
+	buf += (RMR_CLT_MD_SIZE - sizeof(struct rmr_srv_md));
+	for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+		buf += sizeof(struct rmr_srv_md);
+		srv_md_buf = (struct rmr_srv_md *)buf;
+		member_id = srv_md_buf->member_id;
+		/* skip updating the srv_md of this server pool */
+		if (!member_id || member_id == srv_pool->member_id)
+			continue;
+
+		index = rmr_pool_find_md(&pool->pool_md, member_id, true);
+		if (index < 0) {
+			pr_debug("%s: No space in the pool_md for new member %d\n",
+				 __func__, member_id);
+			err = -EINVAL;
+			continue;
+		}
+
+		pr_debug("Load srv_md[%d] with member_id %d\n", index, member_id);
+		memcpy(&pool->pool_md.srv_md[index], srv_md_buf, sizeof(struct rmr_srv_md));
+		ret = true;
+	}
+
+	if (!ret) {
+		pr_debug("No server metadata found in the buffer\n");
+		err = -EINVAL;
+	}
+
+	return err;
+}
+
+/**
+ * rmr_srv_md_process_buf() - Load the metadata from buffer to the server pool.
+ *
+ * Description:
+ *	This node loads the metadata from buffer to the server pool.
+ */
+int rmr_srv_md_process_buf(struct rmr_pool *pool, void *buf, bool sync)
+{
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_pool_md *buf_pool_md, *dest_md = &pool->pool_md;
+	int err = 0;
+
+	srv_pool = (struct rmr_srv_pool *)pool->priv;
+	buf_pool_md = (struct rmr_pool_md *)buf;
+	if (!sync) {
+		/* Copy only the client-side header. */
+		memcpy(dest_md, buf_pool_md, RMR_CLT_MD_SIZE);
+	} else {
+		err = rmr_srv_md_load_buf(pool, buf);
+		if (err)
+			pr_err("Failed to load md buf to pool %s\n", pool->poolname);
+	}
+
+	return err;
+}
+
+int rmr_srv_send_md_update(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_pool *sync_pool = srv_pool->clt;
+	struct rmr_msg_pool_cmd msg = {};
+	int err = 0, buflen;
+	void *buf;
+
+	/* Only normal-state server pools should send metadata updates. */
+	if (atomic_read(&srv_pool->state) != RMR_SRV_POOL_STATE_NORMAL)
+		return -EINVAL;
+
+	/* For a stg node A, is A->B alive? */
+	if (!sync_pool) {
+		pr_debug("pool %s has no sync pool assigned. Cannot send md update commands.\n",
+			 pool->poolname);
+		return -ENXIO;
+	}
+
+	buf = kzalloc(RMR_MD_SIZE, GFP_KERNEL);
+	if (!buf)
+		return -ENOMEM;
+	buflen = RMR_MD_SIZE;
+
+	rmr_clt_init_cmd(sync_pool, &msg);
+	msg.cmd_type = RMR_CMD_MD_SEND;
+	/* This node sends messages to start md_update. */
+	msg.md_send_cmd.leader_id = srv_pool->member_id;
+	msg.md_send_cmd.src_mapped_size = pool->mapped_size;
+
+	err = rmr_clt_send_cmd_with_data_all(sync_pool, &msg, buf, buflen);
+	if (err < 0) {
+		pr_debug("pool %s sends all sess RMR_CMD_MD_SEND failed\n", pool->poolname);
+		goto free_buf;
+	}
+
+	/*
+	 * keep the original slice of buffer if the corresponding send req failed.
+	 *
+	 * TODO:
+	 * We need to use the err received from rmr_clt_send_cmd_with_data_all in this function,
+	 * and match the sessions we are skipping.
+	 *
+	 * In general, the sessions_skipped == (RMR_POOL_MAX_SESS - (number_of_legs - 1 - err).
+	 * If the above number does not match, then we abandon the buffers, and try again.
+	 */
+	err = rmr_srv_md_load_buf(pool, buf);
+	if (err) {
+		pr_debug("Failed to load md buf to pool %s\n", pool->poolname);
+		goto free_buf;
+	}
+
+free_buf:
+	kfree(buf);
+	return err;
+}
+
+/**
+ * rmr_srv_refresh_md() - Refresh the metadata of the rmr pool.
+ *
+ * @srv_pool: Server pool whose metadata to be find
+ *
+ * Description:
+ *	Read the metadata of the rmr pool from the backing store.
+ *
+ * Return:
+ *	True when reading the metadata succeeds in two cases. The first case is a successful read
+ *	but no metadata found. The second case is it found metadata which contains the srv_md.
+ *	False otherwise.
+ */
+int rmr_srv_refresh_md(struct rmr_srv_pool *srv_pool)
+{
+	struct rmr_pool_md *pool_md_page;
+	struct rmr_pool *pool = srv_pool->pool;
+	int index, ret;
+	u64 md_ver;
+
+	pool_md_page = kzalloc(RMR_MD_SIZE, GFP_KERNEL);
+	if (!pool_md_page)
+		return -ENOMEM;
+
+	if (rmr_srv_read_md(pool, NULL, 0, RMR_MD_SIZE, pool_md_page)) {
+		pr_err("%s: failed reading md of rmr\n", __func__);
+		goto free_md;
+	}
+
+	pr_info("%s: Read md of pool %s from store with magic 0x%llx\n",
+		__func__, pool_md_page->poolname, pool_md_page->magic);
+
+	if (pool_md_page->magic != RMR_POOL_MD_MAGIC) {
+		pr_info("%s: No valid md found on the store for pool %s\n",
+			__func__, pool->poolname);
+		ret = -EINVAL;
+		goto free_md;
+	}
+
+	/*
+	 * TODO: Should we sanity check other params also?
+	 */
+	if (pool_md_page->chunk_size != pool->chunk_size) {
+		pr_err("%s: chunk size mismatched. pool chunk size %u, md chunk size %u\n",
+		       __func__, pool->chunk_size, pool_md_page->chunk_size);
+		goto free_md;
+	}
+
+	/* Import the metadata to the states of the pool. */
+	index = rmr_pool_find_md(pool_md_page, srv_pool->member_id, false);
+	if (index < 0) {
+		pr_info("%s: No md found for member_id %d\n", __func__, srv_pool->member_id);
+		ret = index;
+		goto free_md;
+	}
+
+	if (pool_md_page->srv_md[index].mapped_size != pool->mapped_size) {
+		pr_err("%s: Mapped size mismatched. The srv pool %llu, md %llu\n",
+		       __func__, pool->mapped_size, pool_md_page->mapped_size);
+		ret = -EINVAL;
+		goto free_md;
+	}
+
+	md_ver = pool_md_page->srv_md[index].map_ver;
+	if (md_ver < pool->map_ver)
+		pr_err("The current map ver is %lld but the map ver on md is %lld.\n",
+		       pool->map_ver, md_ver);
+	else
+		pool->map_ver = md_ver;
+
+	pool->pool_md = *pool_md_page;
+
+	ret = rmr_srv_load_last_io(srv_pool);
+	if (ret) {
+		pr_err("%s: failed to load last_io array to memory with err 0x%x\n",
+		       __func__, ret);
+		goto zero_md;
+	}
+
+	pr_info("%s: no_of_chunks %lld\n", __func__, pool->no_of_chunks);
+	ret = rmr_srv_refresh_md_maps(srv_pool);
+	if (ret) {
+		pr_err("%s: failed to load dirty bitmap to memory with err %pe\n",
+		       __func__, ERR_PTR(ret));
+		goto free_last_io;
+	}
+	goto free_md;
+
+free_last_io:
+	kfree(srv_pool->last_io_idx);
+	srv_pool->last_io_idx = NULL;
+zero_md:
+	memset(&pool->pool_md, 0, sizeof(pool->pool_md));
+free_md:
+	kfree(pool_md_page);
+	return ret;
+}
+
+/**
+ * rmr_srv_mark_maps_dirty() - Set MD_DIRTY_MAPS and schedule delayed sync
+ *
+ * @srv_pool:	Server pool with changed maps
+ */
+void rmr_srv_mark_maps_dirty(struct rmr_srv_pool *srv_pool)
+{
+	set_bit(MD_DIRTY_MAPS, &srv_pool->md_dirty);
+	mod_delayed_work(srv_pool->md_sync_wq, &srv_pool->md_sync_dwork,
+			 msecs_to_jiffies(RMR_SRV_MD_SYNC_INTERVAL_MS));
+}
+
+/**
+ * rmr_srv_md_sync - sync dirty metadata regions of pool
+ *
+ * Description:
+ *	Dirty-driven consumer: only flushes regions whose dirty bit is set.
+ *	Producers set bits and schedule this work via mod_delayed_work().
+ *	Does NOT re-queue itself — the next dirty event will schedule it.
+ */
+void rmr_srv_md_sync(struct work_struct *work)
+{
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_pool *pool;
+	bool ret, did_work = false;
+
+	srv_pool = container_of(to_delayed_work(work), struct rmr_srv_pool, md_sync_dwork);
+	if (!srv_pool->pool)
+		return;
+
+	/*
+	 * It could happen that access the pool while the pool is not there. Use reference counting
+	 * for server pool to avoid the issue.
+	 */
+	ret = rmr_get_srv_pool(srv_pool);
+	if (!ret) {
+		pr_err("%s: pool is not there\n", __func__);
+		return;
+	}
+
+	pool = srv_pool->pool;
+
+	/*
+	 * Update srv_md snapshot and notify peers whenever any region is dirty.
+	 */
+	if (!rmr_srv_md_update(srv_pool) && rmr_srv_send_md_update(pool))
+		pr_debug("failed to send md update\n");
+
+	/*
+	 * The io store is ready after the store is registered and the pool metadata is
+	 * updated, if any.
+	 */
+	if (!atomic_read(&srv_pool->store_state) || !pool->mapped_size)
+		goto put_pool;
+
+	/*
+	 * On-disk layout of rmr pool metadata:
+	 *
+	 *   0           RMR_MD_SIZE   +last_io_len    +PAGE_SIZE
+	 *   +-----------+-------------+---------------+--------------------+
+	 *   | pool_md   | last_io     | hdr_region    | maps_region ...    |
+	 *   +-----------+-------------+---------------+--------------------+
+	 *   <-RMR_MD_SIZE><-last_io_len><--PAGE_SIZE--><-per_map slp pages->
+	 *
+	 * pool->maps[0:maps_cnt] is always dense (no NULL gaps).
+	 *
+	 * This I/O covers pool_md + last_io. hdr_region and maps_region are
+	 * written separately by rmr_srv_md_maps_sync().
+	 */
+	if (test_and_clear_bit(MD_DIRTY_POOL, &srv_pool->md_dirty)) {
+		rmr_srv_flush_pool_md(srv_pool);
+		did_work = true;
+	}
+
+	if (test_and_clear_bit(MD_DIRTY_LAST_IO, &srv_pool->md_dirty)) {
+		rmr_srv_flush_last_io(srv_pool);
+		did_work = true;
+	}
+
+	if (test_and_clear_bit(MD_DIRTY_MAPS, &srv_pool->md_dirty)) {
+		rmr_srv_md_maps_sync(pool);
+		did_work = true;
+	}
+
+	if (did_work)
+		pr_debug("%s: flushed dirty regions for server pool %u of %s\n",
+			 __func__, srv_pool->member_id, pool->poolname);
+
+put_pool:
+	rmr_put_srv_pool(srv_pool);
+	/* Do NOT re-queue. Producers schedule us via mod_delayed_work. */
+}
diff --git a/drivers/infiniband/ulp/rmr/rmr-srv.c b/drivers/infiniband/ulp/rmr/rmr-srv.c
new file mode 100644
index 000000000000..66af29b90c53
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-srv.c
@@ -0,0 +1,3306 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include <linux/module.h>
+#include <linux/blkdev.h>
+
+#include "rmr-srv.h"
+#include "rmr-req.h"
+#include "rmr-clt.h"
+
+MODULE_AUTHOR("The RMR and BRMR developers");
+MODULE_VERSION(RMR_VER_STRING);
+MODULE_DESCRIPTION("RMR Server");
+MODULE_LICENSE("GPL");
+
+static struct rtrs_srv_ctx *rtrs_ctx;
+struct kmem_cache *rmr_req_cachep;
+
+static LIST_HEAD(g_sess_list);
+static DEFINE_MUTEX(g_sess_lock);
+
+#define MIN_CHUNK_SIZE (128 << 10)
+#define MAX_CHUNK_SIZE (1024 << 10)
+#define DEFAULT_CHUNK_SIZE MIN_CHUNK_SIZE
+
+static int __read_mostly chunk_size = DEFAULT_CHUNK_SIZE;
+
+module_param_named(chunk_size, chunk_size, uint, 0444);
+MODULE_PARM_DESC(chunk_size,
+		 "Unit size which is tracked for being dirty. (default: "
+		 /* cppcheck-suppress unknownMacro */
+		 __stringify(DEFAULT_CHUNK_SIZE) "KB)");
+
+static int __read_mostly sync_queue_depth = DEFAULT_SYNC_QUEUE_DEPTH;
+
+module_param_named(sync_queue_depth, sync_queue_depth, uint, 0644);
+MODULE_PARM_DESC(sync_queue_depth,
+		 "Max in-flight sync requests per pool (default: "
+		 __stringify(DEFAULT_SYNC_QUEUE_DEPTH) ")");
+
+bool rmr_get_srv_pool(struct rmr_srv_pool *srv_pool)
+{
+	pr_debug("pool %s, before inc refcount %d\n",
+		 srv_pool->pool->poolname, refcount_read(&srv_pool->refcount));
+	return refcount_inc_not_zero(&srv_pool->refcount);
+}
+
+static struct rmr_srv_pool *rmr_find_and_get_srv_pool(u32 group_id)
+{
+	struct rmr_pool *pool;
+	struct rmr_srv_pool *srv_pool;
+
+	mutex_lock(&pool_mutex);
+	pool = rmr_find_pool_by_group_id(group_id);
+	if (!pool) {
+		mutex_unlock(&pool_mutex);
+		return ERR_PTR(-ENOENT);
+	}
+
+	srv_pool = (struct rmr_srv_pool *)pool->priv;
+	if (!rmr_get_srv_pool(srv_pool)) {
+		mutex_unlock(&pool_mutex);
+		return ERR_PTR(-EINVAL);
+	}
+	mutex_unlock(&pool_mutex);
+
+	return srv_pool;
+}
+
+void rmr_put_srv_pool(struct rmr_srv_pool *srv_pool)
+{
+	struct rmr_pool *pool = srv_pool->pool;
+
+	might_sleep();
+
+	pr_debug("pool %s, before dec refcnt %d\n",
+		 (pool ? pool->poolname : "(empty)"), refcount_read(&srv_pool->refcount));
+	if (refcount_dec_and_test(&srv_pool->refcount)) {
+		mutex_destroy(&srv_pool->srv_pool_lock);
+
+		if (srv_pool->clt)
+			rmr_clt_close(srv_pool->clt);
+
+		kfree(srv_pool->last_io);
+		srv_pool->last_io = NULL;
+		kfree(srv_pool->last_io_idx);
+		srv_pool->last_io_idx = NULL;
+
+		if (pool) {
+			pr_info("srv: destroy pool %s\n", pool->poolname);
+			free_pool(pool);
+		}
+
+		cancel_delayed_work_sync(&srv_pool->md_sync_dwork);
+		destroy_workqueue(srv_pool->md_sync_wq);
+
+		cancel_delayed_work_sync(&srv_pool->clean_dwork);
+		destroy_workqueue(srv_pool->clean_wq);
+
+		kfree(srv_pool);
+	}
+}
+
+static const char *rmr_get_srv_pool_state_name(enum rmr_srv_pool_state state)
+{
+	switch (state) {
+	case RMR_SRV_POOL_STATE_EMPTY: return "RMR_SRV_POOL_STATE_EMPTY";
+	case RMR_SRV_POOL_STATE_REGISTERED: return "RMR_SRV_POOL_STATE_REGISTERED";
+	case RMR_SRV_POOL_STATE_CREATED: return "RMR_SRV_POOL_STATE_CREATED";
+	case RMR_SRV_POOL_STATE_NORMAL: return "RMR_SRV_POOL_STATE_NORMAL";
+	case RMR_SRV_POOL_STATE_NO_IO: return "RMR_SRV_POOL_STATE_NO_IO";
+
+	default: return "Unknown state";
+	}
+}
+
+/**
+ * rmr_srv_change_pool_state() - Change srv pool state
+ *
+ * @srv_pool:	Server pool whose state is to be changed
+ * @new_state:	State to which the transition is to be made
+ *
+ * Return:
+ *	old state on succes
+ *	negative error code on failure
+ *
+ * Description:
+ *	This function controls the state transitions for rmr-srv pool state.
+ *	Every state transition is controlled by this except to NORMAL.
+ *	Function rmr_srv_set_pool_state_normal handles transition to state NORMAL.
+ *	"always-invalid" state transitions are checked and prevented here
+ *	Case dependent valid/invalid state transition, should be handled by caller
+ */
+static inline int rmr_srv_change_pool_state(struct rmr_srv_pool *srv_pool,
+					    enum rmr_srv_pool_state new_state)
+{
+	enum rmr_srv_pool_state old_state = atomic_read(&srv_pool->state);
+	int cmp_state;
+
+	WARN_ON(new_state == RMR_SRV_POOL_STATE_NORMAL);
+
+	if (old_state == new_state)
+		return old_state;
+
+	pr_info("%s: Old state %s, Requested state %s\n",
+		__func__, rmr_get_srv_pool_state_name(old_state),
+		rmr_get_srv_pool_state_name(new_state));
+
+	switch (new_state) {
+	case RMR_SRV_POOL_STATE_NO_IO:
+		/*
+		 * NO_IO can be reached from REGISTERED, CREATED, or NORMAL.
+		 * EMPTY -> NO_IO is illegal: a pool with no store cannot have
+		 * active sessions that fail.
+		 */
+		if (WARN_ON(old_state == RMR_SRV_POOL_STATE_EMPTY))
+			goto err;
+		atomic_set(&srv_pool->state, RMR_SRV_POOL_STATE_NO_IO);
+		break;
+	case RMR_SRV_POOL_STATE_EMPTY:
+		/*
+		 * EMPTY is reached from REGISTERED (store unregistered, no
+		 * sessions) or from NO_IO (last session left, no store). A
+		 * direct jump from CREATED or NORMAL is illegal — those states
+		 * must pass through NO_IO first.
+		 */
+		if (WARN_ON(old_state == RMR_SRV_POOL_STATE_CREATED ||
+			    old_state == RMR_SRV_POOL_STATE_NORMAL))
+			goto err;
+		atomic_set(&srv_pool->state, RMR_SRV_POOL_STATE_EMPTY);
+		break;
+	case RMR_SRV_POOL_STATE_REGISTERED:
+		/*
+		 * REGISTERED is entered from EMPTY (store just registered, no
+		 * sessions) or from NO_IO (last session left, store still
+		 * present). A direct jump from CREATED or NORMAL is illegal —
+		 * those states must pass through NO_IO first.
+		 */
+		if (WARN_ON(old_state == RMR_SRV_POOL_STATE_CREATED ||
+			    old_state == RMR_SRV_POOL_STATE_NORMAL))
+			goto err;
+		atomic_set(&srv_pool->state, RMR_SRV_POOL_STATE_REGISTERED);
+
+		break;
+	case RMR_SRV_POOL_STATE_CREATED:
+		/*
+		 * CREATED is entered only from REGISTERED, when the first
+		 * non-sync create-mode join arrives. Any other predecessor
+		 * is illegal.
+		 */
+		cmp_state = RMR_SRV_POOL_STATE_REGISTERED;
+		if (atomic_try_cmpxchg(&srv_pool->state, &cmp_state, RMR_SRV_POOL_STATE_CREATED))
+			goto out;
+		WARN_ON(1);
+		goto err;
+	default:
+		pr_err("%s: Unknown state %d\n", __func__, new_state);
+		goto err;
+	}
+
+out:
+	rmr_srv_mark_pool_md_dirty(srv_pool);
+	return old_state;
+
+err:
+	pr_err("%s: Failed. Old state %s, Requested state %s\n",
+	       __func__, rmr_get_srv_pool_state_name(old_state),
+	       rmr_get_srv_pool_state_name(new_state));
+	return -EINVAL;
+}
+
+/**
+ * rmr_srv_set_pool_state_normal() - Change srv pool state to NORMAL
+ *
+ * @srv_pool:	Server pool whose state is to be changed to NORMAL
+ *
+ * Return:
+ *	old state on succes
+ *	negative error code on failure
+ *
+ * Description:
+ *	This function controls the state transitions for rmr-srv pool state to NORMAL
+ *	"always-invalid" state transitions are checked and prevented here
+ *	Case dependent valid/invalid state transition, should be handled by caller
+ */
+static int rmr_srv_set_pool_state_normal(struct rmr_srv_pool *srv_pool)
+{
+	int old_state;
+
+	mutex_lock(&srv_pool->srv_pool_lock);
+	old_state = atomic_read(&srv_pool->state);
+
+	pr_info("%s: Old state %s\n", __func__,
+		rmr_get_srv_pool_state_name(old_state));
+
+	if (old_state == RMR_SRV_POOL_STATE_NORMAL)
+		goto out;
+
+	/*
+	 * CREATED -> NORMAL: normal enable on a newly created pool.
+	 * NO_IO -> NORMAL: map update completed, pool can serve IOs again.
+	 * Any other predecessor is illegal.
+	 */
+	if (WARN_ON(old_state != RMR_SRV_POOL_STATE_CREATED &&
+		    old_state != RMR_SRV_POOL_STATE_NO_IO)) {
+		old_state = -EINVAL;
+		goto out;
+	}
+
+	atomic_set(&srv_pool->state, RMR_SRV_POOL_STATE_NORMAL);
+	rmr_srv_mark_pool_md_dirty(srv_pool);
+	pr_info("%s: Server pool state changed to NORMAL\n", __func__);
+
+out:
+	mutex_unlock(&srv_pool->srv_pool_lock);
+
+	return old_state;
+}
+
+/**
+ * rmr_srv_clear_map() - clear the  dirty map if other pool member completely synced it
+ *
+ * @pool:	rmr pool that holds the maps to clean
+ * @member_id:	pool member id for which map is reported as clean
+ *
+ * Description:
+ *	If other pool member responded that he finished syncing his data, then we can
+ *	clear his map replicated to this nodes, in case of some clear commands were
+ *	lost or failed.
+ *
+ * Return:
+ *	no
+ *
+ * Context:
+ *	This function can wait on spin_lock if the deleted entry should be inserted back
+ *
+ * Locks:
+ *	no
+ */
+static void rmr_srv_clear_map(struct rmr_pool *pool, u8 member_id)
+{
+	// TODO: this looks like rmr_pool_map_remove_entries, can we do something about this?
+	// I was not able to merge them, but it would be nice.
+	struct rmr_dirty_id_map *map = NULL;
+	rmr_id_t id;
+	int i, lock_idx;
+
+	pr_debug("pool %s clear map entries for member_id=%u\n",
+		 pool->poolname, member_id);
+
+	lock_idx = srcu_read_lock(&pool->map_srcu);
+	map = rmr_pool_find_map(pool, member_id);
+	if (!map) {
+		pr_err("for pool %s cannot find map for member id %u\n", pool->poolname, member_id);
+		goto unlock;
+	}
+
+	/* if the map state changed since we send our CHECK_MAP command, it means that
+	 * some entries were added and the map is not clean and we should not wipe it.
+	 * rsp of CHECK_MAP cmd can be outdated a little so we do not trust it then.
+	 */
+	if (atomic_read(&map->check_state) != RMR_MAP_STATE_CHECKING)
+		pr_debug("map for member_id=%u cannot be cleared now, state changed\n",
+			 map->member_id);
+
+	for (i = 0; i < map->no_of_chunks; i++) {
+		id.a = 1;
+		id.b = i;
+
+		rmr_map_unset_dirty(map, id, MAP_NO_FILTER);
+
+		/* If the state changed since the last check then it is possible that after
+		 * clear_bit of RMR_MAP_STATE_CHECK_CLEAR in the rmr_req_check_map we called
+		 * rmr_map_insert. There we check that entry is already in the map and leave
+		 * the function. But the following erease here would delete it. So we return
+		 * erased entry back to the table if the state of checking changed.
+		 */
+		if (atomic_read(&map->check_state) != RMR_MAP_STATE_CHECKING) {
+			pr_debug("map for member_id=%u cannot be cleared now, state changed\n",
+				 map->member_id);
+
+			rmr_map_set_dirty(map, id, 0);
+			goto unlock;
+		}
+	}
+	pr_debug("clear map entries for member_id=%u is done\n", member_id);
+unlock:
+	srcu_read_unlock(&pool->map_srcu, lock_idx);
+	rmr_srv_mark_maps_dirty((struct rmr_srv_pool *)pool->priv);
+}
+
+/**
+ * rmr_srv_check_map_clear() - periodic work that checks if the other node finished sync
+ *
+ * @work:	delayed work structure to start and repeat the work
+ *
+ * Description:
+ *	Check the dirty maps of all of the other pool members. If any of the maps is dirty
+ *	then send check command and if the pool member responds that it has cleared his map,
+ *	then we should clear it locally. When checking is done reschedule itself again.
+ *
+ * Return:
+ *	no
+ *
+ * Context:
+ *	runs in the process context.
+ *
+ * Locks:
+ *	no
+ */
+static void rmr_srv_check_map_clear(struct work_struct *work)
+{
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_pool *pool;
+	int i, lock_idx;
+
+	srv_pool = container_of(to_delayed_work(work), struct rmr_srv_pool, clean_dwork);
+
+	if (!srv_pool->pool) {
+		pr_debug("no rmr pool assigend to srv_pool yet.\n");
+		goto out;
+	}
+
+	pool = srv_pool->pool;
+	pr_debug("check map for srv pool %s started...\n", pool->poolname);
+
+	if (atomic_read(&srv_pool->state) != RMR_SRV_POOL_STATE_NORMAL) {
+		pr_debug("srv pool %s is not in normal state, skip map clear check",
+			 pool->poolname);
+		goto out;
+	}
+
+	if (!srv_pool->clt) {
+		pr_debug("srv pool %s does not have sync pool assigned, skip map clear check\n",
+			 pool->poolname);
+		goto out;
+	}
+
+	lock_idx = srcu_read_lock(&pool->map_srcu);
+	for (i = 0; i < pool->maps_cnt; i++) {
+		struct rmr_dirty_id_map *map;
+		u8 member_id;
+		int ret;
+
+		map = rcu_dereference(pool->maps[i]);
+		if (WARN_ON(!map))
+			break;
+
+		member_id = map->member_id;
+		if (member_id == srv_pool->member_id) {
+			pr_debug("srv pool %s skip checking map with id %u, since it is me.\n",
+				 pool->poolname, member_id);
+			continue;
+		}
+
+		if (rmr_map_empty(map)) {
+			pr_debug("srv pool %s map for member_id=%u is empty, no need to check\n",
+				 pool->poolname, map->member_id);
+			continue;
+		}
+
+		atomic_set(&map->check_state, RMR_MAP_STATE_CHECKING);
+
+		ret = rmr_clt_pool_member_synced(srv_pool->clt, member_id);
+		if (ret < 0) {
+			pr_debug("pool %s failed to check if member_id=%u synced, ret %d\n",
+				 pool->poolname, member_id, ret);
+			atomic_set(&map->check_state, RMR_MAP_STATE_NO_CHECK);
+			continue;
+		}
+
+		pr_debug("pool %s check if pool member %u synced, reported %u\n\n",
+			 pool->poolname, member_id, ret);
+		if (ret)
+			rmr_srv_clear_map(pool, member_id);
+
+		atomic_set(&map->check_state, RMR_MAP_STATE_NO_CHECK);
+	}
+	srcu_read_unlock(&pool->map_srcu, lock_idx);
+
+	pr_debug("check map for pool %s done. schedule next one.\n", pool->poolname);
+
+out:
+	queue_delayed_work(srv_pool->clean_wq, &srv_pool->clean_dwork,
+			   msecs_to_jiffies(RMR_SRV_CHECK_MAPS_INTERVAL_MS));
+}
+
+struct rmr_srv_pool *rmr_create_srv_pool(char *poolname, u32 member_id)
+{
+	struct rmr_srv_pool *srv_pool;
+	srv_pool = kzalloc(sizeof(struct rmr_srv_pool), GFP_KERNEL);
+	if (unlikely(!srv_pool))
+		return ERR_PTR(-ENOMEM);
+
+	atomic_set(&srv_pool->state, RMR_SRV_POOL_STATE_EMPTY);
+	srv_pool->maintenance_mode = false;
+	refcount_set(&srv_pool->refcount, 1);
+	mutex_init(&srv_pool->srv_pool_lock);
+
+	atomic_set(&srv_pool->store_state, false);
+
+	srv_pool->member_id = member_id;
+	srv_pool->max_sync_io_size = U32_MAX;
+
+	/* Sync thread */
+	srv_pool->th_tsk = NULL;
+	atomic_set(&srv_pool->thread_state, SYNC_THREAD_STOPPED);
+	atomic_set(&srv_pool->in_flight_sync_reqs, 0);
+
+	/* clean outdated entries from the map work */
+	srv_pool->clean_wq = alloc_workqueue("%s_clean_wq", 0, 0, poolname);
+	if (!srv_pool->clean_wq) {
+		kfree(srv_pool);
+		pr_err("failed to create wq pool %s\n", poolname);
+		return ERR_PTR(-ENOMEM);
+	}
+	INIT_DELAYED_WORK(&srv_pool->clean_dwork, rmr_srv_check_map_clear);
+	queue_delayed_work(srv_pool->clean_wq, &srv_pool->clean_dwork,
+			   msecs_to_jiffies(RMR_SRV_CHECK_MAPS_INTERVAL_MS));
+
+	/* sync metadata of the rmr pool */
+	srv_pool->md_sync_wq = alloc_workqueue("%s_md_sync_wq", 0, 0, poolname);
+	if (!srv_pool->md_sync_wq) {
+		kfree(srv_pool);
+		pr_err("failed to create md_sync_wq pool %s\n", poolname);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	INIT_DELAYED_WORK(&srv_pool->md_sync_dwork, rmr_srv_md_sync);
+	/* No initial queue — first dirty event will schedule the work. */
+	return srv_pool;
+}
+
+void rmr_srv_pool_update_params(struct rmr_pool *pool)
+{
+	pr_info("%s: Setting chunk_size for pool %s to %d",
+		__func__, pool->poolname, chunk_size);
+	pool->chunk_size = chunk_size;
+	pool->chunk_size_shift = ilog2(chunk_size);
+}
+
+static struct rmr_pool *rmr_srv_sess_get_pool(struct rmr_srv_sess *srv_sess, u32 group_id)
+{
+	struct rmr_pool *pool;
+	struct rmr_srv_pool *srv_pool;
+	bool ret;
+
+	rcu_read_lock();
+	pool = xa_load(&srv_sess->pools, group_id);
+	if (!pool) {
+		pool = ERR_PTR(-ENXIO);
+		goto out;
+	}
+
+	srv_pool = (struct rmr_srv_pool *)pool->priv;
+	ret = rmr_get_srv_pool(srv_pool);
+	if (!ret)
+		pool = ERR_PTR(-ENXIO);
+
+out:
+	rcu_read_unlock();
+	return pool;
+}
+
+static void rmr_srv_sess_put_pool(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	rmr_put_srv_pool(srv_pool);
+}
+
+/**
+ * rmr_srv_endreq() - Function called when an rmr server request finishes processing
+ *
+ * @req:	Pointer to the request ending
+ * @err:	Error value. Would be 0 for a successful request
+ */
+void rmr_srv_endreq(struct rmr_srv_req *req, int err)
+{
+	struct rmr_srv_pool *srv_pool = req->srv_pool;
+	struct rmr_pool *pool = srv_pool->pool;
+	struct rtrs_srv_op *rtrs_op = req->rtrs_op;
+	struct rmr_dirty_id_map *map;
+	int i;
+
+	if (req->flags == RMR_OP_MD_WRITE || req->flags == RMR_OP_MD_READ) {
+		if (unlikely(err))
+			pr_err("Failed to complete the md req %x\n", req->flags);
+		goto put_ref;
+	} else if (unlikely(err) && !req->sync) {
+		struct rmr_srv_pool *srv_pool = req->srv_pool;
+
+		rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+	} else if (rmr_op(req->flags) == RMR_OP_WRITE) {
+		srv_pool->last_io[req->mem_id].a = req->id.a;
+		srv_pool->last_io[req->mem_id].b = req->id.b;
+
+		if (!test_and_set_bit(MD_DIRTY_LAST_IO, &srv_pool->md_dirty)) {
+			mod_delayed_work(srv_pool->md_sync_wq,
+					 &srv_pool->md_sync_dwork,
+					 msecs_to_jiffies(RMR_SRV_MD_SYNC_INTERVAL_MS));
+		}
+
+		for (i = 0; i < req->failed_cnt; i++) {
+			int err;
+
+			map = rmr_pool_find_map(srv_pool->pool, req->failed_srv_id[i]);
+			if (!map) {
+				pr_err("Cannot find map for srv_id %u\n", req->failed_srv_id[i]);
+				err = -EINVAL;
+				goto out;
+			}
+
+			atomic_set(&map->check_state, RMR_MAP_STATE_NO_CHECK);
+			rmr_map_set_dirty(map, req->id, 0);
+
+			if (req->map_ver > srv_pool->pool->map_ver)
+				srv_pool->pool->map_ver = req->map_ver;
+		}
+		if (req->failed_cnt) {
+			rmr_srv_mark_pool_md_dirty(srv_pool);
+			rmr_srv_mark_maps_dirty(srv_pool);
+		}
+	}
+
+out:
+	/* The requests created by rmr-srv don't use rtrs_op. */
+	rtrs_srv_resp_rdma(rtrs_op, err);
+	rmr_srv_sess_put_pool(req->srv_pool->pool);
+put_ref:
+	percpu_ref_put(&pool->ids_inflight_ref);
+}
+
+static void rmr_srv_stop_sync_and_unset_store(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	atomic_set(&srv_pool->store_state, false);
+
+	if (atomic_read(&srv_pool->thread_state) != SYNC_THREAD_STOPPED) {
+		atomic_set(&srv_pool->thread_state, SYNC_THREAD_REQ_STOP);
+		wake_up_process(srv_pool->th_tsk);
+
+		while (atomic_read(&srv_pool->thread_state) != SYNC_THREAD_STOPPED) {
+			set_current_state(TASK_INTERRUPTIBLE);
+			schedule_timeout(msecs_to_jiffies(1000));
+		}
+	}
+}
+
+static void rmr_srv_delete_store_member(struct rmr_pool *pool, unsigned long id)
+{
+	rmr_pool_remove_map(pool, id);
+	xa_erase(&pool->stg_members, id);
+	rmr_srv_mark_maps_dirty((struct rmr_srv_pool *)pool->priv);
+}
+
+/**
+ * rmr_srv_add_store_member() - Register a storage member and create its dirty map
+ *
+ * @pool:	The pool to which the member belongs.
+ * @id:		Member ID of the storage node to add.
+ *
+ * Records @id in pool->stg_members and allocates a dirty map for it.
+ * On failure the stg_members entry is removed before returning.
+ *
+ * Return:
+ *	0 on success, negative error code on failure.
+ */
+static int rmr_srv_add_store_member(struct rmr_pool *pool, unsigned long id)
+{
+	struct rmr_dirty_id_map *map;
+	int ret;
+
+	map = rmr_pool_find_map(pool, id);
+	if (map) {
+		pr_err("%s: pool %s, member_id %lu map already exists\n",
+		       __func__, pool->poolname, id);
+		return -EEXIST;
+	}
+
+	ret = xa_err(xa_store(&pool->stg_members, id, XA_TRUE, GFP_KERNEL));
+	if (ret) {
+		pr_err("%s: Failed to add storage member %lu: %d\n",
+		       __func__, id, ret);
+		return ret;
+	}
+
+	/*
+	 * Create the map of the newly added member.
+	 */
+	map = rmr_map_create(pool, id);
+	if (IS_ERR(map)) {
+		ret = PTR_ERR(map);
+		pr_err("%s: pool %s, member_id %lu failed to create map on err %d: %pe\n",
+		       __func__, pool->poolname, id, ret, map);
+		goto rem_store;
+	}
+	return 0;
+
+rem_store:
+	xa_erase(&pool->stg_members, id);
+	return ret;
+}
+
+/**
+ * rmr_srv_handle_other_member_add() - Handle a POOL_INFO ADD message for a different member
+ *
+ * @srv_pool:		The server pool receiving the notification.
+ * @pool_info_cmd:	The received POOL_INFO command carrying member_id, mode, and dirty.
+ *
+ * For %RMR_POOL_INFO_MODE_ASSEMBLE, verifies that the member and its dirty map
+ * already exist (the node is rejoining a pool it was previously part of).
+ * For %RMR_POOL_INFO_MODE_CREATE, adds the member via rmr_srv_add_store_member()
+ * and optionally marks its map fully dirty if the client reported outstanding data.
+ *
+ * Return:
+ *	0 on success, negative error code on failure.
+ */
+static int rmr_srv_handle_other_member_add(struct rmr_srv_pool *srv_pool,
+					   const struct rmr_msg_pool_info_cmd *pool_info_cmd)
+{
+	struct rmr_pool *pool = srv_pool->pool;
+	struct rmr_dirty_id_map *map;
+	int ret;
+
+	if (pool_info_cmd->mode == RMR_POOL_INFO_MODE_ASSEMBLE) {
+		pr_info("%s: Member %u got add of member %u with mode assemble\n",
+			__func__, srv_pool->member_id, pool_info_cmd->member_id);
+
+		/*
+		 * For assemble, member info should already exist.
+		 */
+		if (xa_load(&pool->stg_members, pool_info_cmd->member_id) != XA_TRUE) {
+			pr_err("%s: pool %s, member_id %u not present\n",
+			       __func__, pool->poolname, pool_info_cmd->member_id);
+			return -ENOENT;
+		}
+
+		map = rmr_pool_find_map(pool, pool_info_cmd->member_id);
+		if (!map) {
+			pr_err("%s: pool %s, member_id %u, map not present\n",
+			       __func__, pool->poolname, pool_info_cmd->member_id);
+			return -ENOENT;
+		}
+	} else if (pool_info_cmd->mode == RMR_POOL_INFO_MODE_CREATE &&
+		    pool_info_cmd->member_id != srv_pool->member_id) {
+		pr_info("%s: Member %u got add of member %u with mode create\n",
+			__func__, srv_pool->member_id, pool_info_cmd->member_id);
+
+		ret = rmr_srv_add_store_member(pool, pool_info_cmd->member_id);
+		if (ret) {
+			pr_err("%s: rmr_srv_add_store_member failed %d\n", __func__, ret);
+			return ret;
+		}
+
+		if (pool_info_cmd->dirty) {
+			map = rmr_pool_find_map(pool, pool_info_cmd->member_id);
+			if (WARN_ON(!map)) {
+				xa_erase(&pool->stg_members, pool_info_cmd->member_id);
+				return -EINVAL;
+			}
+			rmr_map_set_dirty_all(map, MAP_NO_FILTER);
+		}
+		rmr_srv_mark_maps_dirty((struct rmr_srv_pool *)pool->priv);
+	} else {
+		pr_err("%s: pool %s, member_id %u, unexpected mode %u for ADD operation\n",
+		       __func__, pool->poolname, pool_info_cmd->member_id,
+		       pool_info_cmd->mode);
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+int rmr_srv_query(struct rmr_pool *pool, u64 mapped_size, struct rmr_attrs *attr)
+{
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_dirty_id_map *map;
+	size_t queue_depth;
+
+	if (pool) {
+		srv_pool = (struct rmr_srv_pool *)pool->priv;
+		queue_depth = srv_pool->queue_depth;
+	} else {
+		/*
+		 * If pool is NULL, we are being called to estimate the md size
+		 * before the pool is created. Use max queue depth in that case.
+		 */
+		queue_depth = RMR_SRV_MAX_QDEPTH;
+	}
+
+	/*
+	 * Dummy map structure, so that we can reuse the update map param function.
+	 */
+	map = (struct rmr_dirty_id_map *)get_zeroed_page(GFP_KERNEL);
+	if (!map) {
+		pr_err("%s: Cannot allocate map\n", __func__);
+		return -ENOMEM;
+	}
+
+	map->no_of_chunks = (mapped_size >> (ilog2(chunk_size) - 9));
+	rmr_map_update_page_params(map);
+
+	attr->rmr_md_size = (map->total_slp * PAGE_SIZE * RMR_POOL_MAX_SESS) + RMR_MD_SIZE;
+	attr->rmr_md_size += (queue_depth * sizeof(*srv_pool->last_io_idx));
+
+	attr->rmr_md_size = attr->rmr_md_size / SECTOR_SIZE;
+
+	free_page((unsigned long)map);
+	return 0;
+}
+EXPORT_SYMBOL(rmr_srv_query);
+
+/**
+ * rmr_srv_set_map() - Create the dirty map for this server's member in the pool
+ *
+ * @pool:	The pool for which the map is to be created.
+ * @mode:	Registration mode; if %RMR_SRV_DISK_REPLACE, any existing map for
+ *		this member is removed before creating the new one.
+ *
+ * Description:
+ *	Invoked after the mapped size of the pool has been validated.  Updates
+ *	pool metadata with the mapped size, recalculates the chunk count, and
+ *	calls rmr_srv_add_store_member() to register this node's map.
+ *
+ * Return:
+ *	0 on success, negative error code on failure.
+ */
+static int rmr_srv_set_map(struct rmr_pool *pool, enum rmr_srv_register_disk_mode mode)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	int ret, md_i;
+
+	pr_info("%s: Mapped size of the pool %s is set to %lld\n",
+		__func__, pool->poolname, pool->mapped_size);
+
+	/* Update mapped_size in the pool metadata. */
+	md_i = rmr_pool_find_md(&pool->pool_md, srv_pool->member_id, true);
+	if (md_i < 0) {
+		pr_err("No space for new member %d.\n", srv_pool->member_id);
+		return -ENOMEM;
+	}
+	pool->pool_md.srv_md[md_i].mapped_size = pool->mapped_size;
+
+	/*
+	 * The existing map is irrelevant if user asked for store REPLACE.
+	 */
+	if (mode == RMR_SRV_DISK_REPLACE)
+		rmr_pool_remove_map(pool, srv_pool->member_id);
+
+	ret = rmr_srv_add_store_member(pool, srv_pool->member_id);
+	if (ret) {
+		pr_err("%s: rmr_srv_add_store_member failed %d\n", __func__, ret);
+		goto err_out;
+	}
+
+	return ret;
+
+err_out:
+	pool->pool_md.srv_md[md_i].mapped_size = 0;
+	return ret;
+}
+
+/**
+ * rmr_srv_register() - Register a backend store with an RMR server pool
+ *
+ * @poolname:	Name of the pool to which the store is to be registered.
+ * @ops:	Store operations pointer.
+ * @priv:	Private data for the store.
+ * @mapped_size:	Size of the storage device in sectors.
+ * @mode:	Registration mode: %RMR_SRV_DISK_CREATE for a new store,
+ *		%RMR_SRV_DISK_REPLACE to replace an existing one, or
+ *		%RMR_SRV_DISK_ADD to rejoin an existing pool.
+ *
+ * Description:
+ *	An RMR server pool requires a backend store to service I/Os.
+ *	This function registers that store, sets up the pool's dirty map for
+ *	this member, and records the marked_create flag for validation when
+ *	the first client joins.
+ *
+ * Return:
+ *	Pointer to the rmr_pool on success, NULL on error.
+ */
+static bool rmr_srv_pool_has_non_sync_sess(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool_sess *pool_sess;
+
+	list_for_each_entry(pool_sess, &pool->sess_list, pool_entry) {
+		if (!pool_sess->sync)
+			return true;
+	}
+	return false;
+}
+
+struct rmr_pool *rmr_srv_register(char *poolname, struct rmr_srv_store_ops *ops, void *priv,
+				  u64 mapped_size, enum rmr_srv_register_disk_mode mode)
+{
+	struct rmr_pool *pool;
+	struct rmr_srv_io_store *io_store;
+	struct rmr_srv_pool *srv_pool;
+	u32 group_id = rmr_pool_hash(poolname);
+	enum rmr_srv_pool_state state;
+	int ret;
+
+	srv_pool = rmr_find_and_get_srv_pool(group_id);
+	if (IS_ERR(srv_pool)) {
+		pr_err("pool %s does not exists: %pe\n", poolname, srv_pool);
+		return NULL;
+	}
+	pool = srv_pool->pool;
+
+	mutex_lock(&srv_pool->srv_pool_lock);
+	if (mode == RMR_SRV_DISK_CREATE &&
+	    (rmr_srv_pool_has_non_sync_sess(pool) ||
+	     rmr_pool_find_map(pool, srv_pool->member_id))) {
+		pr_err("%s: Cannot register (create) new backend for %s; Sessions/Map exists\n",
+		       __func__, poolname);
+		ret = -EEXIST;
+		goto put_err;
+	}
+
+	if (mode == RMR_SRV_DISK_REPLACE &&
+	    (!rmr_srv_pool_has_non_sync_sess(pool))) {
+		pr_err("%s: Cannot register (replace) new backend for %s; No non-sync session\n",
+		       __func__, poolname);
+		ret = -EINVAL;
+		goto put_err;
+	}
+
+	if (srv_pool->io_store) {
+		pr_err("Srv pool %s already has store registered\n", poolname);
+		goto put_err;
+	}
+
+	if (pool->mapped_size && pool->mapped_size != mapped_size) {
+		pr_err("Pool %s already has mapped size %lld, cannot register store with %lld\n",
+		       poolname, pool->mapped_size, mapped_size);
+		ret = -EINVAL;
+		goto put_err;
+	}
+
+	io_store = kzalloc(sizeof(*io_store), GFP_KERNEL);
+	if (!io_store) {
+		pr_err("Failed to allocate io_store for %s\n", poolname);
+		goto put_err;
+	}
+
+	pool->mapped_size = mapped_size;
+	io_store->ops = ops;
+	io_store->priv = priv;
+	srv_pool->io_store = io_store;
+
+	/* The pool updates its number of tracking chunks with the mapped size just provided. */
+	rmr_pool_update_no_of_chunk(pool);
+
+	if (mode == RMR_SRV_DISK_CREATE || mode == RMR_SRV_DISK_REPLACE) {
+		ret = rmr_srv_set_map(pool, mode);
+		if (ret) {
+			pr_err("%s: failed to set maps in rmr pool %s, err %d\n",
+			       __func__, poolname, ret);
+			goto free_io_store;
+		}
+	} else if (mode == RMR_SRV_DISK_ADD) {
+		/*
+		 * Read the pool metadata stored on this device before md_sync writes
+		 * new metadata to the store.
+		 */
+		ret = rmr_srv_refresh_md(srv_pool);
+		if (ret) {
+			pr_err("%s: cannot refresh md of the pool\n", __func__);
+			goto free_io_store;
+		}
+	} else {
+		pr_err("%s: Wrong register disk mode %d\n", __func__, mode);
+		ret = -EINVAL;
+		goto free_io_store;
+	}
+
+	srv_pool->marked_create = (mode == RMR_SRV_DISK_CREATE);
+	atomic_set(&srv_pool->store_state, true);
+	rmr_srv_mark_pool_md_dirty(srv_pool);
+	state = atomic_read(&srv_pool->state);
+	if (state != RMR_SRV_POOL_STATE_NORMAL &&
+	    state != RMR_SRV_POOL_STATE_NO_IO)
+		rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_REGISTERED);
+	mutex_unlock(&srv_pool->srv_pool_lock);
+
+	__module_get(THIS_MODULE);
+	pr_info("Registered store with pool %s\n", poolname);
+
+	return srv_pool->pool;
+
+free_io_store:
+	kfree(io_store);
+	srv_pool->io_store = NULL;
+put_err:
+	mutex_unlock(&srv_pool->srv_pool_lock);
+	rmr_put_srv_pool(srv_pool);
+	return NULL;
+}
+EXPORT_SYMBOL(rmr_srv_register);
+
+static void rmr_srv_delete_md(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_dirty_id_map *map = NULL;
+	int err, lock_idx;
+	u32 map_region_offset = rmr_bitmap_offset(pool->pool_md.queue_depth) + RMR_MAP_BUF_HDR_SIZE;
+	u64 per_map_size = 0;
+	u64 len;
+	u8 map_idx;
+	void *buf;
+
+	/*
+	 * It could happen to access the pool while the pool is not there. Use reference counting
+	 * for server pool to avoid the issue.
+	 */
+	err = rmr_get_srv_pool(srv_pool);
+	if (!err) {
+		pr_err("%s: pool is not there\n", __func__);
+		return;
+	}
+
+	len = rmr_bitmap_offset(pool->pool_md.queue_depth) + PAGE_SIZE;
+	buf = kzalloc(len, GFP_KERNEL);
+	if (!buf)
+		goto put_pool;
+
+	/*
+	 * On-disk layout of rmr pool metadata:
+	 *
+	 *   0           RMR_MD_SIZE   +last_io_len    +PAGE_SIZE
+	 *   +-----------+-------------+---------------+--------------------+
+	 *   | pool_md   | last_io     | hdr_region    | maps_region ...    |
+	 *   +-----------+-------------+---------------+--------------------+
+	 *   <-RMR_MD_SIZE><-last_io_len><--PAGE_SIZE--> maps_cnt * per_map
+	 */
+	err = process_md_io(pool, NULL, 0, len, RMR_OP_MD_WRITE, buf);
+	if (err)
+		pr_warn("%s: failed to process md write io with err 0x%x.\n", __func__, err);
+
+	/*
+	 * Zero the bitmap on disk using O(1) offset formula.
+	 */
+	lock_idx = srcu_read_lock(&pool->map_srcu);
+	for (map_idx = 0; map_idx < pool->maps_cnt; map_idx++) {
+		u32 map_data_offset;
+		el_flp *flp_ptr;
+		u64 no_of_slps;
+		int i, j;
+
+		map = rcu_dereference(pool->maps[map_idx]);
+		if (WARN_ON(!map))
+			break;
+
+		per_map_size = map->total_slp * PAGE_SIZE;
+		map_data_offset = map_region_offset + map_idx * per_map_size;
+
+		for (i = 0; i < map->no_of_flp; i++) {
+			flp_ptr = (el_flp *)map->dirty_bitmap[i];
+
+			if (i == (map->no_of_flp - 1))
+				no_of_slps = map->no_of_slp_in_last_flp;
+			else
+				no_of_slps = NO_OF_SLP_PER_FLP;
+
+			for (j = 0; j < no_of_slps; j++, flp_ptr++) {
+				err = process_md_io(pool, NULL, map_data_offset,
+						    PAGE_SIZE, RMR_OP_MD_WRITE, buf);
+				if (err)
+					pr_warn("%s: bitmap write failed at 0x%x, err 0x%x.\n",
+						__func__, map_data_offset, err);
+				map_data_offset += PAGE_SIZE;
+			}
+		}
+	}
+	srcu_read_unlock(&pool->map_srcu, lock_idx);
+
+	rmr_srv_delete_store_member(pool, srv_pool->member_id);
+
+	free_page((unsigned long)buf);
+put_pool:
+	rmr_put_srv_pool(srv_pool);
+}
+
+/**
+ * rmr_srv_unregister() - Unregister the backend store from rmr server pool
+ *
+ * @poolname:	Name of the pool from which the store is to be unregistered
+ * @delete:	If true, delete all the metadata associated with this pool
+ *
+ * Description:
+ *	rmr server pool needs a backend store which serves the IOs
+ *	This function is used to unregister a backend store from rmr server pool.
+ *
+ * Return:
+ *	None
+ */
+void rmr_srv_unregister(char *poolname, bool delete)
+{
+	struct rmr_pool *pool;
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_srv_io_store	*io_store;
+
+	mutex_lock(&pool_mutex);
+	pool = rmr_find_pool(poolname);
+	mutex_unlock(&pool_mutex);
+
+	if (!pool) {
+		pr_err("%s, Pool %s does not exists\n", __func__, poolname);
+		return;
+	}
+
+	srv_pool = (struct rmr_srv_pool *)pool->priv;
+	mutex_lock(&srv_pool->srv_pool_lock);
+
+	if (!srv_pool->io_store) {
+		pr_err("Srv pool %s not registered\n", poolname);
+		mutex_unlock(&srv_pool->srv_pool_lock);
+		return;
+	}
+
+	if (srv_pool->marked_delete) {
+		if (!delete) {
+			pr_err("%s: Storage server marked for delete, but delete mode not set\n",
+			       __func__);
+			pr_err("%s: Continuing with only removal", __func__);
+		}
+	} else if (!srv_pool->marked_create && delete) {
+		pr_err("%s: Storage server not marked for delete, abandoning delete.\n", __func__);
+		delete = false;
+	}
+
+	io_store = srv_pool->io_store;
+
+	rmr_srv_stop_sync_and_unset_store(pool);
+
+	percpu_ref_kill_and_confirm(&pool->ids_inflight_ref, rmr_pool_confirm_inflight_ref);
+	wait_for_completion(&pool->complete_done);
+	wait_for_completion(&pool->confirm_done);
+
+	/*
+	 * Re-init so metadata IO can go in if needed
+	 */
+	reinit_completion(&pool->complete_done);
+	reinit_completion(&pool->confirm_done);
+	percpu_ref_reinit(&pool->ids_inflight_ref);
+
+	if (delete)
+		rmr_srv_delete_md(pool);
+
+	kfree(srv_pool->io_store);
+	srv_pool->io_store = NULL;
+
+	mutex_lock(&pool->sess_lock);
+	if (!rmr_srv_pool_has_non_sync_sess(pool))
+		rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_EMPTY);
+	mutex_unlock(&pool->sess_lock);
+
+	srv_pool->marked_delete = false;
+	mutex_unlock(&srv_pool->srv_pool_lock);
+
+	pool->mapped_size = 0;
+
+	rmr_put_srv_pool(srv_pool);
+
+	pr_info("Unregistered store with pool %s\n", poolname);
+
+	module_put(THIS_MODULE);
+}
+EXPORT_SYMBOL(rmr_srv_unregister);
+
+/**
+ * rmr_srv_pool_cmd_with_rsp() - Sends a user command to all sessions of the internal (sync) clt
+ *
+ * @pool:	rmr pool to which the command is for
+ * @conf:	confirmation function to be called after completion
+ * @priv:	pointer to priv data, to be returned to user while calling conf function
+ * @usr_vec:	kvec containing user data (mostly command messages?)
+ * @nr:		number of kvecs
+ * @buf:	buf where the response from the user server is to be directed
+ * @buf_len:	length of the buffer
+ * @size:	size of the buf to be sent to a single session
+ *
+ * Description:
+ *	This function provides an interface for the user to send commands to storage nodes connected
+ *	through the internal network of this rmr pool.
+ *	It redirects the command through the rmr-client pool in this storage node, which then sends
+ *	the command to all the storage nodes it is connected to.
+ *	The command is sent as a read, so that the response from the user srv side can be received
+ *	The buffer sent by the user is meant to receive the response from the user server side.
+ *	The size of the buffer is set during rmr_clt_open.
+ *
+ * Return:
+ *	0 on success
+ *	negative errno in case of error
+ *
+ * Context:
+ *	Inflight commands will block map update, until the inflights are completed.
+ */
+int rmr_srv_pool_cmd_with_rsp(struct rmr_pool *pool, rmr_conf_fn *conf, void *priv,
+			     const struct kvec *usr_vec, size_t nr, void *buf, int buf_len,
+			     size_t size)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	if (!srv_pool->clt) {
+		pr_warn("srv pool %s does not have sync pool assigned.\n",
+			pool->poolname);
+		return -EAGAIN;
+	}
+
+	return rmr_clt_cmd_with_rsp(srv_pool->clt, conf, priv, usr_vec, nr, buf, buf_len, size);
+}
+EXPORT_SYMBOL(rmr_srv_pool_cmd_with_rsp);
+
+static int rmr_srv_send_discard_all(struct rmr_pool *pool, u8 member_id)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_pool *sync_pool = srv_pool->clt;
+	struct rmr_msg_pool_cmd msg = {};
+	int err;
+
+	/*
+	 * If the member_id is not this server's member_id, it means this server is the receiving
+	 * node of the discard request.
+	 */
+	if (srv_pool->member_id != member_id)
+		return 0;
+
+	pr_info("%s: Send discards across storage nodes for pool %s\n",
+		__func__, pool->poolname);
+
+	rmr_clt_init_cmd(sync_pool, &msg);
+	msg.cmd_type = RMR_CMD_SEND_DISCARD;
+	msg.send_discard_cmd.member_id = member_id;
+
+	err = rmr_clt_pool_send_all(sync_pool, &msg);
+	if (err) {
+		pr_err("Failed to send discard cmd for pool %s: %d\n",
+		       pool->poolname, err);
+	}
+	return err;
+}
+
+/**
+ * rmr_srv_discard_id() - discard the data chunks of length from offset on disk
+ *
+ * @pool:	source pool.
+ * @offset	offset in bytes.
+ * @length:	length in bytes
+ * @member_id:	member id of the storage node to discard the data from. If 0, then the node is
+ *		this server pool.
+ * @sync:	indicates whether to send sync requests to other connected nodes.
+ *
+ * Return:
+ *	0 on success, err code otherwise
+ *
+ * Description:
+ *	This function discards the data chunks on the server with member_id. It will mark the
+ *	data chunks as dirty and set the discard_entries flag of the corresponding srv_md true.
+ *	Then it notifies all the connected nodes it has discarded data.
+ */
+int rmr_srv_discard_id(struct rmr_pool *pool, u64 offset, u64 length, u8 member_id, bool sync)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_dirty_id_map *map;
+	rmr_id_t id;
+	int md_i, err;
+
+	if (!member_id)
+		member_id = srv_pool->member_id;
+
+	map = rmr_pool_find_map(pool, member_id);
+	if (!map) {
+		pr_err("for srv pool %s cannot find map for member_id %u\n",
+		       pool->poolname, member_id);
+		return -EINVAL;
+	}
+
+	md_i = rmr_pool_find_md(&pool->pool_md, member_id, false);
+	if (md_i < 0) {
+		pr_err("%s: for srv pool %s cannot find md for member_id %u\n",
+		       __func__, pool->poolname, member_id);
+		return -EINVAL;
+	}
+
+	/*
+	 * If this node has received a response of the discard request from a normal server,
+	 * the node will continue to mark all the data chunks as dirty.
+	 */
+	if (member_id == srv_pool->member_id && sync) {
+		if (!srv_pool->clt) {
+			pr_err("pool %s has no sync pool assigned. Cannot send discards.\n",
+			       pool->poolname);
+			return -ENXIO;
+		}
+
+		/*
+		 * This node tries to send discards to all its connected nodes. The other node
+		 * that has received the discards will start a new round. In the end, all normal
+		 * nodes that are connected to this node should receive the discards.
+		 */
+		err = rmr_srv_send_discard_all(pool, member_id);
+		if (err) {
+			pr_err("%s: no server receives discards for pool %s: %d\n",
+			       __func__, pool->poolname, err);
+			return err;
+		}
+	}
+
+	/*
+	 * Set the discard_entries flag of the corresponding srv_md true. Be careful that setting
+	 * the wrong srv_md will lead to loops of discards.
+	 */
+	pool->pool_md.srv_md[md_i].discard_entries = true;
+	rmr_srv_mark_pool_md_dirty(srv_pool);
+
+	if (length) {
+		rmr_map_calc_chunk(pool, offset, length, &id);
+		rmr_map_set_dirty(map, id, MAP_ENTRY_UNSYNCED);
+	} else {
+		/* discard all data chunks */
+		rmr_map_set_dirty_all(map, MAP_ENTRY_UNSYNCED);
+		pr_info("%s: Discard all data chunks for member_id %u in srv_pool %s: %u\n",
+			__func__, member_id, pool->poolname, srv_pool->member_id);
+	}
+
+	rmr_map_clear_filter_all(map, MAP_ENTRY_UNSYNCED);
+	rmr_srv_mark_maps_dirty(srv_pool);
+
+	return 0;
+}
+EXPORT_SYMBOL(rmr_srv_discard_id);
+
+void rmr_srv_replace_store(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	RMR_STORE_SET_REPLACE(pool->map_ver);
+	rmr_srv_flush_pool_md(srv_pool);
+}
+EXPORT_SYMBOL(rmr_srv_replace_store);
+
+/**
+ * rmr_srv_pool_check_store() - Check whether IO is allowed for a pool or not
+ *
+ * @pool:	pool to check
+ *
+ * Return:
+ *	1 if IO is allowed, 0 therwise
+ *
+ * Description:
+ *	For a rmr-srv pool, the store registered provides a way to check whether it can process
+ *	IOs or not.
+ */
+static int rmr_srv_pool_check_store(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_srv_io_store *store = srv_pool->io_store;
+	void *store_priv;
+
+	if (!store) {
+		pr_debug("for srv pool %s no store assigned\n", pool->poolname);
+		return false;
+	}
+
+	if (!store->ops) {
+		pr_err("for pool %s store has no ops assigned\n", pool->poolname);
+		return false;
+	}
+	store_priv = store->priv;
+
+	return store->ops->io_allowed(store_priv);
+}
+
+/**
+ * process_msg_io() - Process IO message
+ *
+ * @srv_sess:	rmr srv session over which the message was received
+ * @rtrs_op:	rtrs IO context
+ * @data:	pointer to data buf
+ * @datalen:	len of data buf
+ * @usr:	pointer to user buf
+ * @usrlen:	len of user buf
+ *
+ * Return:
+ *	0 on success
+ *	negative error code otherwise
+ *
+ * Description:
+ *	Perform some basic checks.
+ *	Create an IO request and start its state machine.
+ */
+static int process_msg_io(struct rmr_srv_sess *srv_sess,
+			  struct rtrs_srv_op *rtrs_op, void *data,
+			  u32 datalen, const void *usr, size_t usrlen)
+{
+	const struct rmr_msg_io *msg = usr;
+	struct rmr_pool *pool;
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_srv_req *req;
+	int err = 0;
+	u32 group_id = le32_to_cpu(msg->hdr.group_id);
+
+	pool = rmr_srv_sess_get_pool(srv_sess, group_id);
+	if (IS_ERR(pool)) {
+		pr_err_ratelimited("Got I/O request on session %s for unknown pool group id %d: %pe\n",
+				   srv_sess->sessname, group_id, pool);
+		return PTR_ERR(pool);
+	}
+
+	srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	/*
+	 * No new references will come in after we have killed the percpu_ref.
+	 * Percpu_ref_tryget_live() returns false when @confirm_kill in
+	 * percpu_ref_kill_and_confirm() is done.
+	 */
+	if (!percpu_ref_tryget_live(&pool->ids_inflight_ref)) {
+		err = -EIO;
+		goto no_put;
+	}
+
+	if (!atomic_read(&srv_pool->store_state) ||
+	    atomic_read(&srv_pool->state) != RMR_SRV_POOL_STATE_NORMAL) {
+		pr_err_ratelimited("server pool %s is not up for IO (state = %s)\n",
+				   pool->poolname,
+				   rmr_get_srv_pool_state_name(atomic_read(&srv_pool->state)));
+		err = -EIO;
+		goto put_pool;
+	}
+
+	/*
+	 * The IOs coming from internal sync sessions are always READ.
+	 */
+	if (msg->sync && rmr_op(le32_to_cpu(msg->flags)) != RMR_OP_READ) {
+		pr_err_ratelimited("process_msg_io: pool %s write IO from internal connection.\n",
+				   pool->poolname);
+		err = -EIO;
+		goto put_pool;
+	}
+
+	/*
+	 * For non internal IOs, make sure the underlying store is ready for IO
+	 */
+	if (!msg->sync && !rmr_srv_pool_check_store(pool)) {
+		pr_err("process_msg_io: pool %s IO not allowed\n", pool->poolname);
+		err = -EIO;
+		goto put_pool;
+	}
+
+	req = rmr_srv_req_create(msg, srv_pool, rtrs_op, data, datalen, rmr_srv_endreq);
+	if (IS_ERR(req)) {
+		pr_err("Failed to create rmr_req %pe\n", req);
+
+		//TODO: do we have to rtrs_srv_resp_rdma here ?
+		err = PTR_ERR(req);
+		goto put_pool;
+	}
+
+	rmr_req_submit(req);
+	return 0;
+
+put_pool:
+	percpu_ref_put(&pool->ids_inflight_ref);
+
+no_put:
+	rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+	rmr_srv_sess_put_pool(pool);
+	return err;
+}
+
+int rmr_srv_get_sync_permit(struct rmr_srv_pool *srv_pool)
+{
+	atomic_inc(&srv_pool->in_flight_sync_reqs);
+
+	while (atomic_read(&srv_pool->in_flight_sync_reqs) >= sync_queue_depth) {
+		/* Permit overslow; sleep */
+		set_current_state(TASK_INTERRUPTIBLE);
+		schedule();
+
+		if (atomic_read(&srv_pool->thread_state) != SYNC_THREAD_RUNNING) {
+			atomic_dec(&srv_pool->in_flight_sync_reqs);
+
+			return -EINTR;
+		}
+	}
+
+	return 0;
+}
+
+void rmr_srv_put_sync_permit(struct rmr_srv_pool *srv_pool)
+{
+	atomic_dec(&srv_pool->in_flight_sync_reqs);
+
+	wake_up_process(srv_pool->th_tsk);
+}
+
+static int rmr_srv_sync_map(void *arg)
+{
+	struct rmr_srv_pool *srv_pool = arg;
+	struct rmr_pool *pool = srv_pool->pool;
+	struct rmr_dirty_id_map *map;
+	rmr_id_t rmr_id;
+	struct rmr_map_entry *entry;
+	int err = 0;
+	u64 i;
+
+	pr_info("Sync thread starting!\n");
+
+	map = rmr_pool_find_map(pool, srv_pool->member_id);
+	if (!map) {
+		/*
+		 * We do not need to error out here.
+		 * Since no session has ever been added to this pool,
+		 * it technically means this pool is in sync state.
+		 */
+		pr_info("No map found for pool %s\n", pool->poolname);
+		goto out;
+	}
+
+	rmr_id.a = 1;
+	for (i = 0; i < map->no_of_chunks; i++) {
+		if (atomic_read(&srv_pool->thread_state) == SYNC_THREAD_REQ_STOP) {
+			pr_info("Request to stop sync thread\n");
+			err = -EINTR;
+			goto err;
+		}
+
+		if (!atomic_read(&srv_pool->store_state) ||
+		    atomic_read(&srv_pool->state) != RMR_SRV_POOL_STATE_NORMAL) {
+			atomic_set(&srv_pool->thread_state, SYNC_THREAD_WAIT);
+			pr_err("Pool not in desired state\n");
+			/* Unsure what error to return here */
+			err = -EINVAL;
+			goto err;
+		}
+
+		rmr_id.b = i;
+		entry = rmr_map_get_dirty_entry(map, rmr_id);
+		if (entry) {
+			if (atomic_cmpxchg(&entry->sync_cnt, -1, 0) != -1) {
+				/* someone has already started sync for this id */
+				continue;
+			}
+
+			err = rmr_srv_sync_chunk_id(srv_pool, entry, rmr_id, true);
+			if (err) {
+				/* this is to undo the previous cmpxchg if the error in
+				 * rmr_srv_sync_chunk_id happened before any requests were created
+				 */
+				atomic_cmpxchg(&entry->sync_cnt, 0, -1);
+				pr_err("Failed to sync chunk (%llu, %llu)\n", rmr_id.a, rmr_id.b);
+				goto err;
+			}
+		}
+	}
+
+	/*
+	 * Finished syncing chunks,
+	 * Now change the thread state to wait,
+	 * to wait for the in flight syncs
+	 */
+	atomic_set(&srv_pool->thread_state, SYNC_THREAD_WAIT);
+
+err:
+	while (atomic_read(&srv_pool->in_flight_sync_reqs) != 0) {
+		/*
+		 * Wait for all permits to get freed.
+		 * Since the completion path needs this thread to
+		 * be up and running
+		 */
+		set_current_state(TASK_INTERRUPTIBLE);
+		schedule();
+		//TODO: should it be timeout?
+	}
+
+out:
+	atomic_set(&srv_pool->thread_state, SYNC_THREAD_STOPPED);
+
+	pr_info("Sync thread exiting with err %d\n", err);
+	return err;
+}
+
+int rmr_srv_sync_thread_start(struct rmr_srv_pool *srv_pool)
+{
+	atomic_set(&srv_pool->in_flight_sync_reqs, 0);
+	srv_pool->th_tsk = kthread_run(rmr_srv_sync_map, srv_pool,
+				       "rmr_srv_sync_thread");
+	if (IS_ERR(srv_pool->th_tsk)) {
+		atomic_set(&srv_pool->thread_state, SYNC_THREAD_STOPPED);
+		return -ENOMEM;
+	}
+
+	atomic_set(&srv_pool->thread_state, SYNC_THREAD_RUNNING);
+	return 0;
+}
+
+int rmr_srv_sync_thread_stop(struct rmr_srv_pool *srv_pool)
+{
+	if (atomic_read(&srv_pool->thread_state) == SYNC_THREAD_RUNNING) {
+		atomic_set(&srv_pool->thread_state, SYNC_THREAD_REQ_STOP);
+		wake_up_process(srv_pool->th_tsk);
+	}
+
+	return 0;
+}
+
+void rmr_srv_sync_req_failed(struct rmr_srv_pool *srv_pool)
+{
+	/*
+	 * TODO: Investigate the necessity to change server state
+	 * to RMR_SRV_POOL_STATE_NO_IO for sync_req failure.
+	 */
+	// rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+	rmr_srv_sync_thread_stop(srv_pool);
+}
+
+static void rmr_srv_read_map_buf(struct rmr_pool *pool, void *buf, size_t buflen,
+				 const struct rmr_msg_map_buf_cmd *map_buf_cmd)
+{
+	int size;
+	u8 map_idx = map_buf_cmd->map_idx;
+	u64 slp_idx = map_buf_cmd->slp_idx;
+
+	size = rmr_pool_maps_to_buf(pool, &map_idx, &slp_idx, buf, buflen, MAP_NO_FILTER);
+	if (size == 0) {
+		// No more dirty map to write
+		struct rmr_map_buf_hdr *map_buf_hdr = (struct rmr_map_buf_hdr *)buf;
+
+		map_buf_hdr->version = RMR_MAP_FORMAT_VER;
+		map_buf_hdr->member_id = 0;
+	}
+}
+
+static void rmr_srv_update_md_buf(struct rmr_srv_pool *srv_pool, void *buf, size_t buflen)
+{
+	struct rmr_pool *pool = srv_pool->pool;
+	struct rmr_pool_md *pool_md = &pool->pool_md;
+	struct rmr_pool_md *buf_md = (struct rmr_pool_md *)buf;
+	u8 member_id = srv_pool->member_id;
+	int idx, buf_idx;
+
+	/* Zero out the buffer in case data is corrupted somehow. */
+	memset(buf, 0, buflen);
+	idx = rmr_pool_find_md(pool_md, member_id, false);
+	if (idx < 0) {
+		pr_err("The server pool hasn't updated srv_md yet %d\n", member_id);
+		return;
+	}
+
+	buf_idx = rmr_pool_find_md(buf_md, member_id, true);
+	if (buf_idx < 0) {
+		pr_err("The buffer has no space for the member_id %d\n", member_id);
+		return;
+	}
+
+	memcpy(&buf_md->srv_md[buf_idx], &pool_md->srv_md[idx], sizeof(struct rmr_srv_md));
+}
+
+static int rmr_srv_save_last_io_to_map(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_dirty_id_map *map;
+	int i, j, lock_idx;
+
+	map = rmr_pool_find_map(pool, srv_pool->member_id);
+	if (!map) {
+		pr_err("no map found for member_id %u\n", srv_pool->member_id);
+		return -EINVAL;
+	}
+
+	for (i = 0; i < srv_pool->queue_depth; i++) {
+		rmr_id_t *id;
+		struct rmr_dirty_id_map *mp;
+
+		id = &srv_pool->last_io[i];
+
+		if (id->a == U64_MAX && id->b == U64_MAX)
+			continue;
+
+		if (rmr_map_check_dirty(map, *id)) {
+			/*
+			 * We already have this id added to our map, and which says
+			 * that its dirty for us. This means that last_io info about
+			 * this id is outdated.
+			 * We honor the info in the map, and skip this entry
+			 */
+			continue;
+		}
+
+		lock_idx = srcu_read_lock(&pool->map_srcu);
+		for (j = 0; j < pool->maps_cnt; j++) {
+			mp = rcu_dereference(pool->maps[j]);
+			if (WARN_ON(!mp) || mp->member_id == srv_pool->member_id)
+				continue;
+
+			rmr_map_set_dirty(mp, *id, 0);
+
+			// Clean the entry since it has been used up
+			id->a = U64_MAX;
+			id->b = U64_MAX;
+		}
+		srcu_read_unlock(&pool->map_srcu, lock_idx);
+	}
+
+	rmr_srv_mark_maps_dirty(srv_pool);
+	return 0;
+}
+
+/**
+ * process_msg_user_cmd() - Process user command
+ *
+ * @pool:	rmr pool
+ * @cmd_msg:	pointer to command message. The user data is right after this struct.
+ * @data:	data buffer to be passed down the user
+ * @datalen:	length of the user buffer
+ *
+ * Description:
+ *	Pass down the user command to the user server side.
+ *	The user command data is kept right after the pool command (see arranging of kvec)
+ *
+ * Return:
+ *	0 in case of success
+ *	negative is case of failure
+ *
+ * Context:
+ *	The call goes to the user server side. Care must be taken not to block.
+ */
+static int process_msg_user_cmd(struct rmr_srv_pool *srv_pool,
+				const struct rmr_msg_pool_cmd *cmd_msg, void *data, int datalen)
+{
+	struct rmr_srv_io_store *store = srv_pool->io_store;
+	size_t usr_len = cmd_msg->user_cmd.usr_len;
+	int ret;
+
+	pr_debug("%s: cmd_len=%zu usr_len=%zu\n", __func__, sizeof(*cmd_msg), usr_len);
+
+	if (!store) {
+		pr_err("%s: No store registered\n", __func__);
+		return -EAGAIN;
+	}
+
+	ret = store->ops->submit_cmd(store->priv, cmd_msg + 1, usr_len, data, datalen);
+
+	return ret;
+}
+
+static void do_sess_leave_srv_sess(struct rmr_srv_pool_sess *pool_sess)
+{
+	struct rmr_srv_sess *srv_sess = pool_sess->srv_sess;
+
+	mutex_lock(&srv_sess->lock);
+	list_del(&pool_sess->srv_sess_entry);
+	mutex_unlock(&srv_sess->lock);
+}
+
+static void sess_leave_pool(struct rmr_pool *pool,
+			    struct rmr_srv_pool_sess *pool_sess)
+{
+	struct rmr_srv_sess *srv_sess = pool_sess->srv_sess;
+
+	pr_info("pool sesss %s leaves pool %s\n",
+		pool_sess->sessname, pool->poolname);
+
+	mutex_lock(&pool->sess_lock);
+	list_del(&pool_sess->pool_entry);
+	xa_erase(&srv_sess->pools, pool->group_id);
+	mutex_unlock(&pool->sess_lock);
+
+	rmr_srv_sysfs_del_sess(pool_sess);
+
+	pool_sess->srv_pool = NULL;
+}
+
+static void rmr_srv_free_pool_sess(struct rmr_srv_pool_sess *pool_sess)
+{
+	kfree(pool_sess);
+}
+
+static void destroy_sess(struct rmr_srv_sess *srv_sess)
+{
+	struct rmr_srv_pool *srv_pool;
+	struct rmr_srv_pool_sess *pool_sess, *tmp;
+
+	// why do they do this in rnbd srv ?
+	// if (list_empty(&srv_sess->pool_sess_list))
+	// 	goto out;
+
+	mutex_lock(&srv_sess->lock);
+	list_for_each_entry_safe (pool_sess, tmp, &srv_sess->pool_sess_list, srv_sess_entry) {
+		list_del(&pool_sess->srv_sess_entry);
+		srv_pool = pool_sess->srv_pool;
+
+		// A network disconnect event
+		if (!pool_sess->sync)
+			rmr_srv_change_pool_state(pool_sess->srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+
+		sess_leave_pool(srv_pool->pool, pool_sess);
+		rmr_put_srv_pool(srv_pool);
+		rmr_srv_free_pool_sess(pool_sess);
+	}
+	mutex_unlock(&srv_sess->lock);
+
+	xa_destroy(&srv_sess->pools);
+	might_sleep();
+
+	mutex_lock(&g_sess_lock);
+	list_del(&srv_sess->g_list_entry);
+	mutex_unlock(&g_sess_lock);
+
+	mutex_destroy(&srv_sess->lock);
+	kfree(srv_sess);
+}
+
+void rmr_srv_destroy_pool(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool_sess *pool_sess, *tmp;
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	if (!pool) {
+		pr_err("%s: pool is empty\n", __func__);
+		return;
+	}
+
+	list_for_each_entry_safe (pool_sess, tmp, &pool->sess_list, pool_entry) {
+		WARN_ON(!pool_sess->srv_pool);
+
+		do_sess_leave_srv_sess(pool_sess);
+		sess_leave_pool(srv_pool->pool, pool_sess);
+		rmr_put_srv_pool(srv_pool);
+		rmr_srv_free_pool_sess(pool_sess);
+	}
+}
+
+int rmr_srv_remove_clt_pool(struct rmr_srv_pool *srv_pool)
+{
+	struct rmr_pool *clt;
+
+	clt = srv_pool->clt;
+	if (!clt) {
+		pr_info("Srv pool %s has no internal clt pool assigned\n",
+			srv_pool->pool->poolname);
+		return -EINVAL;
+	}
+
+	pr_info("from pool %s remove sync (internal) pool %s\n",
+		srv_pool->pool->poolname, clt->poolname);
+	srv_pool->clt = NULL;
+
+	rmr_clt_close(clt);
+
+	pr_info("pool %s removed\n", clt->poolname);
+
+	return 0;
+}
+
+static int create_srv_sess(struct rtrs_srv_sess *rtrs)
+{
+	struct rmr_srv_sess *srv_sess;
+	char sessname[NAME_MAX];
+	int err;
+
+	err = rtrs_srv_get_path_name(rtrs, sessname, sizeof(sessname));
+	if (unlikely(err)) {
+		pr_err("rtrs_srv_get_sess_name(%s): %d\n", sessname, err);
+		return err;
+	}
+	srv_sess = kzalloc(sizeof(*srv_sess), GFP_KERNEL);
+	if (!srv_sess)
+		return -ENOMEM;
+
+	mutex_init(&srv_sess->lock);
+	srv_sess->rtrs = rtrs;
+	strscpy(srv_sess->sessname, sessname, NAME_MAX);
+	xa_init_flags(&srv_sess->pools, XA_FLAGS_ALLOC);
+	INIT_LIST_HEAD(&srv_sess->pool_sess_list);
+	mutex_init(&srv_sess->lock);
+
+	mutex_lock(&g_sess_lock);
+	list_add(&srv_sess->g_list_entry, &g_sess_list);
+	mutex_unlock(&g_sess_lock);
+
+	rtrs_srv_set_sess_priv(rtrs, srv_sess);
+
+	return 0;
+}
+
+static int rmr_srv_link_ev(struct rtrs_srv_sess *rtrs,
+			   enum rtrs_srv_link_ev ev, void *priv)
+{
+	struct rmr_srv_sess *srv_sess = priv;
+
+	switch (ev) {
+	case RTRS_SRV_LINK_EV_CONNECTED:
+		return create_srv_sess(rtrs);
+
+	case RTRS_SRV_LINK_EV_DISCONNECTED:
+		if (WARN_ON(!srv_sess))
+			return -EINVAL;
+
+		destroy_sess(srv_sess);
+		return 0;
+
+	default:
+		pr_warn("Received unknown rtrs session event %d from session %s\n",
+			ev, srv_sess->sessname);
+		return -EINVAL;
+	}
+}
+
+static struct rmr_srv_pool_sess *__find_sess_in_pool(struct rmr_pool *pool,
+						     const char *sessname)
+{
+	struct rmr_srv_pool_sess *pool_sess;
+
+	list_for_each_entry (pool_sess, &pool->sess_list, pool_entry) {
+		if (!strcmp(pool_sess->sessname, sessname)) {
+			return pool_sess;
+		}
+	}
+
+	return NULL;
+}
+
+static int sess_join_pool(struct rmr_pool *pool, struct rmr_srv_pool_sess *pool_sess)
+{
+	struct rmr_srv_pool_sess *find;
+	struct rmr_srv_sess *srv_sess = pool_sess->srv_sess;
+	int ret = 0;
+
+	mutex_lock(&pool->sess_lock);
+	find = __find_sess_in_pool(pool, pool_sess->sessname);
+	if (find) {
+		ret = -EEXIST;
+		goto unlock;
+	}
+
+	ret = xa_err(xa_store(&srv_sess->pools, pool->group_id, pool, GFP_KERNEL));
+	if (ret) {
+		pr_err("can not add pool %s err %d\n", pool->poolname, ret);
+		goto unlock;
+	}
+	pr_info("%s: Added pool %s to rmr_srv_sess %s\n",
+		__func__, pool->poolname, srv_sess->sessname);
+
+	ret = rmr_srv_sysfs_add_sess(pool, pool_sess);
+	if (ret) {
+		pr_err("failed to create sysfs for pool sess %s in pool %s\n",
+		       pool_sess->sessname, pool->poolname);
+
+		xa_erase(&srv_sess->pools, pool->group_id);
+		goto unlock;
+	}
+	list_add(&pool_sess->pool_entry, &pool->sess_list);
+
+unlock:
+	mutex_unlock(&pool->sess_lock);
+
+	return ret;
+}
+
+static void do_sess_leave_pool(struct rmr_pool *pool, struct rmr_srv_pool_sess *pool_sess)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	do_sess_leave_srv_sess(pool_sess);
+	sess_leave_pool(pool, pool_sess);
+	rmr_put_srv_pool(srv_pool);
+	rmr_srv_free_pool_sess(pool_sess);
+}
+
+/**
+ * process_msg_pool_info() - Process a POOL_INFO membership change notification
+ *
+ * @pool:		Pool which received the command.
+ * @pool_info_cmd:	The received POOL_INFO command carrying member_id,
+ *			operation, mode, and dirty flag.
+ *
+ * Dispatches on (operation, mode) pairs notified by the client:
+ *  - ADD + CREATE:    a new storage node is joining; add it via
+ *		       rmr_srv_handle_other_member_add().
+ *  - ADD + ASSEMBLE:  an existing node is reassembling; verify its map and
+ *		       stg_members entry already exist.
+ *  - REMOVE + DELETE: a storage node is permanently leaving; remove its map
+ *		       and stg_members entry via rmr_srv_delete_store_member().
+ *  - REMOVE + DISASSEMBLE: temporary leave; no map changes needed (TODO).
+ *
+ * Return:
+ *	0 on success, negative error code on failure.
+ */
+static int process_msg_pool_info(struct rmr_pool *pool,
+				 const struct rmr_msg_pool_info_cmd *pool_info_cmd)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	int ret = 0;
+
+	pr_info("%s: Server pool %s with member_id %u, received pool_info message\n",
+		__func__, pool->poolname, srv_pool->member_id);
+
+	if (pool_info_cmd->operation == RMR_POOL_INFO_OP_ADD) {
+		ret = rmr_srv_handle_other_member_add(srv_pool, pool_info_cmd);
+		if (ret) {
+			pr_err("%s: Failed to create maps for other pools: %d\n",
+			       __func__, ret);
+			return ret;
+		}
+	} else if (pool_info_cmd->operation == RMR_POOL_INFO_OP_REMOVE) {
+		if (pool_info_cmd->mode == RMR_POOL_INFO_MODE_DELETE) {
+			pr_info("%s: Member %u got remove of member %u with mode delete\n",
+				__func__, srv_pool->member_id, pool_info_cmd->member_id);
+			rmr_srv_delete_store_member(pool, pool_info_cmd->member_id);
+		} else if (pool_info_cmd->mode == RMR_POOL_INFO_MODE_DISASSEMBLE) {
+			pr_info("%s: Member %u got remove of member %u with mode disassemble, "
+				"preserving dirty map\n",
+				__func__, srv_pool->member_id, pool_info_cmd->member_id);
+			/*
+			 * Do NOT remove the dirty map or stg_members entry for the
+			 * disassembled member.  IOs arriving after this point will
+			 * continue to accumulate dirty entries for that member via
+			 * the piggyback mechanism, so it can resync on reassembly.
+			 */
+		}
+	}
+	rmr_srv_flush_pool_md(srv_pool);
+
+	return ret;
+}
+
+static struct rmr_srv_pool_sess *alloc_pool_sess(struct rmr_srv_pool *srv_pool,
+						 struct rmr_srv_sess *srv_sess)
+{
+	struct rmr_srv_pool_sess *pool_sess;
+
+	pool_sess = kzalloc_node(sizeof(*pool_sess), GFP_KERNEL, NUMA_NO_NODE);
+	if (unlikely(!pool_sess)) {
+		pr_err("Failed to allocate session for srv pool %s\n", srv_pool->pool->poolname);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	strscpy(pool_sess->sessname, srv_sess->sessname, NAME_MAX);
+	INIT_LIST_HEAD(&pool_sess->pool_entry);
+	INIT_LIST_HEAD(&pool_sess->srv_sess_entry);
+	pool_sess->srv_sess = srv_sess;
+	pool_sess->srv_pool = srv_pool;
+
+	return pool_sess;
+}
+
+/**
+ * rmr_srv_process_join_create() - Handle the CREATE case of a join_pool message
+ *
+ * @pool:		The pool being created.
+ * @join_pool_cmd:	The received join_pool command carrying dirty flag and
+ *			per-member info for any pre-existing pool members.
+ *
+ * If the client reports that this server's existing data is dirty, marks own
+ * map fully dirty.  Then iterates the per-member list in the message and adds
+ * each member via rmr_srv_add_store_member(), marking its map dirty if the
+ * client flagged it.  On failure, all members added so far are cleaned up.
+ *
+ * Return:
+ *	0 on success, negative error code on failure.
+ */
+static int rmr_srv_process_join_create(struct rmr_pool *pool,
+				       const struct rmr_msg_join_pool_cmd *join_pool_cmd)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_dirty_id_map *map;
+	int i, ret;
+	u8 member_id;
+
+	/*
+	 * Mark our maps dirty if client asked us to.
+	 */
+	if (join_pool_cmd->dirty) {
+		map = rmr_pool_find_map(pool, srv_pool->member_id);
+		if (!map) {
+			pr_err("%s: No map found for %u\n",
+			       __func__, srv_pool->member_id);
+			return -EINVAL;
+		}
+		rmr_map_set_dirty_all(map, MAP_NO_FILTER);
+	}
+
+	/*
+	 * Add other storage members in case its a create message.
+	 */
+	for (i = 0; i < join_pool_cmd->mem_info.no_of_stor; i++) {
+		member_id = join_pool_cmd->mem_info.p_mem_info[i].member_id;
+
+		ret = rmr_srv_add_store_member(pool, member_id);
+		if (ret) {
+			pr_err("%s: rmr_srv_add_store_member failed %d\n", __func__, ret);
+			goto cleanup;
+		}
+
+		if (join_pool_cmd->mem_info.p_mem_info[i].c_dirty) {
+			map = rmr_pool_find_map(pool, member_id);
+			if (WARN_ON(!map)) {
+				xa_erase(&pool->stg_members, member_id);
+				ret = -EINVAL;
+				goto cleanup;
+			}
+			rmr_map_set_dirty_all(map, MAP_NO_FILTER);
+		}
+	}
+
+	return 0;
+
+cleanup:
+	while (i--)
+		rmr_srv_delete_store_member(pool,
+					    join_pool_cmd->mem_info.p_mem_info[i].member_id);
+	return ret;
+}
+
+static void rmr_srv_process_leave_delete(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	void *entry;
+	unsigned long id;
+
+	/*
+	 * When we are leaving a pool (not disassembly), we have to,
+	 * 1) Delete dirty entries from all the maps of other storage nodes, since we do not
+	 * need them anymore
+	 * 2) Delete all the maps of other storage nodes.
+	 *
+	 * Map for this storage node is created/deleted during register/unregister.
+	 */
+	xa_for_each(&pool->stg_members, id, entry) {
+		if (id == srv_pool->member_id)
+			continue;
+
+		rmr_srv_delete_store_member(pool, id);
+	}
+}
+
+static int process_msg_join_pool(struct rmr_pool *pool, struct rmr_srv_sess *srv_sess,
+				 struct rtrs_srv_sess *rtrs, bool sync,
+				 const struct rmr_msg_join_pool_cmd *join_pool_cmd)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_srv_pool_sess *pool_sess;
+	int ret = 0, i;
+	bool alloced_last_io = false;
+
+	pr_info("Client %s requests to join pool %s (state=%d)\n",
+		srv_sess->sessname, pool->poolname, atomic_read(&srv_pool->state));
+
+	mutex_lock(&srv_sess->lock);
+
+	/*
+	 * Here we only do chunk size check,
+	 * to make sure different storage nodes do not use different chunk sizes.
+	 */
+	if (join_pool_cmd->chunk_size && pool->chunk_size != join_pool_cmd->chunk_size) {
+		pr_err("pool %s has chunksize %u != msg chunksize %u\n",
+		       pool->poolname, pool->chunk_size, join_pool_cmd->chunk_size);
+		ret = -EINVAL;
+		goto unlock;
+	}
+
+	mutex_lock(&srv_pool->srv_pool_lock);
+	if (atomic_read(&srv_pool->state) == RMR_SRV_POOL_STATE_EMPTY) {
+		pr_err("%s: pool %s has no store registered; join rejected\n",
+		       __func__, pool->poolname);
+		ret = -EINVAL;
+		goto unlock_srv_pool_lock;
+	}
+
+	if (!sync) {
+		if (join_pool_cmd->create) {
+			if (srv_pool->last_io || srv_pool->last_io_idx) {
+				pr_err("%s: pool %s already has last_io buffer allocated\n",
+				       __func__, pool->poolname);
+				ret = -EEXIST;
+				goto unlock_srv_pool_lock;
+			}
+
+			if (!srv_pool->marked_create) {
+				pr_err("%s: pool %s not in create state\n",
+				       __func__, pool->poolname);
+				ret = -EINVAL;
+				goto unlock_srv_pool_lock;
+			}
+		} else if (srv_pool->marked_create) {
+			pr_err("%s: pool %s should not be in create state\n",
+			       __func__, pool->poolname);
+			ret = -EINVAL;
+			goto unlock_srv_pool_lock;
+		}
+	}
+
+	pool_sess = alloc_pool_sess(srv_pool, srv_sess);
+	if (IS_ERR(pool_sess)) {
+		pr_err("failed to allc pool_sees for pool %s sev_sess %s: %pe\n",
+		       pool->poolname, srv_sess->sessname, pool_sess);
+		ret = PTR_ERR(pool_sess);
+		goto unlock_srv_pool_lock;
+	}
+	srv_pool->queue_depth = join_pool_cmd->queue_depth;
+
+	ret = sess_join_pool(pool, pool_sess);
+	if (ret) {
+		pr_err("Failed to join pool\n");
+		goto free_sess;
+	}
+	pool_sess->sync = sync;
+
+	if (!pool_sess->sync && !srv_pool->last_io) {
+		/* Joining for the first time */
+		srv_pool->last_io = kcalloc(srv_pool->queue_depth, sizeof(*srv_pool->last_io),
+					    GFP_KERNEL);
+		if (!srv_pool->last_io) {
+			pr_err("Memory allocation failed for srv_pool->last_io\n");
+			ret = -ENOMEM;
+			goto sess_leave;
+		}
+		alloced_last_io = true;
+
+		/* The previous last_io buffer exists. */
+		if (srv_pool->last_io_idx) {
+			memcpy(srv_pool->last_io, srv_pool->last_io_idx,
+			       rmr_last_io_len(srv_pool->queue_depth));
+		} else {
+			for (i = 0; i < srv_pool->queue_depth; i++) {
+				srv_pool->last_io[i].a = U64_MAX;
+				srv_pool->last_io[i].b = U64_MAX;
+			}
+
+			srv_pool->last_io_idx = kcalloc(srv_pool->queue_depth,
+						    sizeof(*srv_pool->last_io_idx), GFP_KERNEL);
+			if (!srv_pool->last_io_idx) {
+				ret = -ENOMEM;
+				goto free_last_io;
+			}
+		}
+		pr_info("Allocated %ld B last_io buffer for pool %s\n",
+			srv_pool->queue_depth * sizeof(*srv_pool->last_io), pool->poolname);
+	}
+
+	/*
+	 * Join/Rejoin messages from sync sessions do not affect our state.
+	 *
+	 * For non-sync sessions, if our state is NO_IO, pserver can either send a,
+	 * - rejoin message in case our state NO_IO due to network/IO issue
+	 * - join message in case pserver crashed
+	 * hence, no state transition is needed.
+	 */
+	if (!pool_sess->sync) {
+		if (join_pool_cmd->create) {
+			/*
+			 * First-time pool creation: set up member info and maps,
+			 * then move to CREATED awaiting enable_pool(1).
+			 */
+			ret = rmr_srv_process_join_create(pool, join_pool_cmd);
+			if (ret) {
+				pr_err("%s: rmr_srv_process_join_create failed %d\n",
+				       __func__, ret);
+				goto free_last_io;
+			}
+
+			/*
+			 * In the CREATE path pool_md has only magic set; all other
+			 * header fields are normally populated later by
+			 * RMR_CMD_SEND_MD_BUF.  Initialise them now so that
+			 * queue_depth (and the bitmap/last_io offsets derived from
+			 * it) are correct before the first on-demand map flush fires.
+			 */
+			pool->pool_md.queue_depth = join_pool_cmd->queue_depth;
+			pool->pool_md.chunk_size  = pool->chunk_size;
+			pool->pool_md.mapped_size = pool->mapped_size;
+			pool->pool_md.group_id    = pool->group_id;
+			strscpy(pool->pool_md.poolname, pool->poolname,
+				sizeof(pool->pool_md.poolname));
+			rmr_srv_mark_pool_md_dirty(srv_pool);
+			rmr_srv_mark_maps_dirty((struct rmr_srv_pool *)pool->priv);
+
+			ret = rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_CREATED);
+			if (ret < 0)
+				goto leave_delete;
+
+			srv_pool->marked_create = false;
+		} else if (atomic_read(&srv_pool->state) != RMR_SRV_POOL_STATE_NO_IO) {
+			/*
+			 * Assemble or rejoin: a map update is needed before IOs
+			 * can resume, so move to NO_IO. If we are already in
+			 * NO_IO (e.g. pserver reconnecting after a network event
+			 * that already drove us there), no transition is needed.
+			 */
+			ret = rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+			if (ret < 0)
+				goto leave_delete;
+		}
+	}
+
+	mutex_unlock(&srv_pool->srv_pool_lock);
+
+	rmr_get_srv_pool(srv_pool);
+	list_add_tail(&pool_sess->srv_sess_entry, &srv_sess->pool_sess_list);
+
+	mutex_unlock(&srv_sess->lock);
+
+	return 0;
+
+leave_delete:
+	if (!pool_sess->sync && join_pool_cmd->create)
+		rmr_srv_process_leave_delete(pool);
+free_last_io:
+	if (alloced_last_io) {
+		kfree(srv_pool->last_io);
+		srv_pool->last_io = NULL;
+
+		kfree(srv_pool->last_io_idx);
+		srv_pool->last_io_idx = NULL;
+	}
+sess_leave:
+	sess_leave_pool(pool, pool_sess);
+free_sess:
+	rmr_srv_free_pool_sess(pool_sess);
+unlock_srv_pool_lock:
+	mutex_unlock(&srv_pool->srv_pool_lock);
+unlock:
+	mutex_unlock(&srv_sess->lock);
+	return ret;
+}
+
+void rmr_srv_stop_sync_and_go_offline(struct rmr_pool *pool)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+	rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+
+	if (atomic_read(&srv_pool->thread_state) != SYNC_THREAD_STOPPED) {
+		atomic_set(&srv_pool->thread_state, SYNC_THREAD_REQ_STOP);
+		wake_up_process(srv_pool->th_tsk);
+
+		while (atomic_read(&srv_pool->thread_state) != SYNC_THREAD_STOPPED) {
+			set_current_state(TASK_INTERRUPTIBLE);
+			schedule_timeout(msecs_to_jiffies(1000));
+		}
+	}
+}
+
+static int process_msg_leave_pool(struct rmr_pool *pool, struct rmr_srv_sess *sess, bool sync,
+				  const struct rmr_msg_leave_pool_cmd *leave_pool_cmd)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_srv_pool_sess *pool_sess;
+	u64 last_io_len;
+	int ret = 0;
+	void *buf;
+
+	pr_info("Session %s requests to leave pool %d\n", sess->sessname,
+		leave_pool_cmd->member_id);
+
+	if (srv_pool->member_id != leave_pool_cmd->member_id) {
+		pr_err("%s: For sess %s, Srv pool member_id %d, Message member_id %d\n",
+		       __func__, sess->sessname, srv_pool->member_id, leave_pool_cmd->member_id);
+		return -ENOENT;
+	}
+
+	mutex_lock(&pool->sess_lock);
+	pool_sess = __find_sess_in_pool(pool, sess->sessname);
+	if (!pool_sess) {
+		mutex_unlock(&pool->sess_lock);
+		pr_err("Session %s is not in pool %s\n", sess->sessname,
+		       pool->poolname);
+		return -ENOENT;
+	}
+	mutex_unlock(&pool->sess_lock);
+
+	do_sess_leave_pool(pool, pool_sess);
+
+	mutex_lock(&srv_pool->srv_pool_lock);
+	srv_pool->marked_delete = leave_pool_cmd->delete;
+	mutex_unlock(&srv_pool->srv_pool_lock);
+
+	if (!sync) {
+		/*
+		 * Stop the sync thread if its running, and go offline.
+		 */
+		rmr_srv_stop_sync_and_go_offline(pool);
+
+		if (leave_pool_cmd->delete) {
+			rmr_srv_process_leave_delete(pool);
+		} else {
+			/*
+			 * Disassemble: flush the dirty map to disk first so that
+			 * the on-disk map reflects all dirty entries accumulated
+			 * up to this point.  On reassembly the map is read back
+			 * and used to drive resync of any members that missed IOs.
+			 */
+			rmr_srv_md_maps_sync(pool);
+
+			/*
+			 * Clear last_io and persist it to disk so that it is not
+			 * used after reassembly.  Note: maps are always flushed
+			 * above regardless of whether last_io is valid; the two
+			 * operations are independent.
+			 */
+			last_io_len = rmr_last_io_len(pool->pool_md.queue_depth);
+
+			if (!srv_pool->last_io || !last_io_len)
+				goto change_state;
+
+			memset(srv_pool->last_io, 0, last_io_len);
+			if (srv_pool->last_io_idx)
+				memset(srv_pool->last_io_idx, 0, last_io_len);
+
+			buf = kzalloc(last_io_len, GFP_KERNEL);
+			if (!buf)
+				goto change_state;
+
+			ret = process_md_io(pool, NULL,
+					    RMR_LAST_IO_OFFSET,
+					    last_io_len,
+					    RMR_OP_MD_WRITE, buf);
+			if (ret) {
+				pr_err("%s: For pool %s process_md_io failed\n",
+				       __func__, pool->poolname);
+			}
+			kfree(buf);
+		}
+
+change_state:
+		/*
+		 * All sessions have left. Transition back to REGISTERED if the
+		 * backend store is still present, or to EMPTY if it is not.
+		 */
+		mutex_lock(&srv_pool->srv_pool_lock);
+		if (srv_pool->io_store)
+			rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_REGISTERED);
+		else
+			rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_EMPTY);
+		mutex_unlock(&srv_pool->srv_pool_lock);
+	}
+
+	return 0;
+}
+
+static int process_msg_map_clear(struct rmr_srv_sess *srv_sess,
+				 const void *usr)
+{
+	const struct rmr_msg_io *msg = usr;
+	struct rmr_pool *pool;
+	rmr_id_t id;
+	unsigned long key;
+	struct rmr_map_entry *entry;
+	struct rmr_dirty_id_map *map;
+	u8 member_id;
+	int err = 0;
+	u32 group_id = le32_to_cpu(msg->hdr.group_id);
+
+	id.a = le64_to_cpu(msg->id_a);
+	id.b = le64_to_cpu(msg->id_b);
+	key = rmr_id_to_key(id);
+	member_id = msg->member_id;
+
+	pr_debug("received map clear msg, id (%llu, %llu), member_id %u\n",
+		 id.a, id.b, member_id);
+
+	pool = rmr_srv_sess_get_pool(srv_sess, group_id);
+	if (IS_ERR(pool)) {
+		pr_err_ratelimited("Got I/O request on session %s for unknown pool: %pe\n",
+				   srv_sess->sessname, pool);
+		return PTR_ERR(pool);
+	}
+
+	map = rmr_pool_find_map(pool, member_id);
+	if (!map) {
+		pr_err("no map found for member_id %u\n", member_id);
+		err = -EINVAL;
+		goto put_pool;
+		//TODO: handle this , probably initialize map, or just throw err?
+	}
+
+	entry = rmr_map_unset_dirty(map, id, MAP_NO_FILTER);
+	if (entry) {
+		/* We do not need any rcu protection here since it is deleted by the other
+		 * rmr server. And sync can only be done for entries that are
+		 * dirty for this particaular server.
+		 */
+		kmem_cache_free(rmr_map_entry_cachep, entry);
+	}
+	rmr_srv_mark_maps_dirty((struct rmr_srv_pool *)pool->priv);
+
+put_pool:
+	rmr_srv_sess_put_pool(pool);
+	return err;
+}
+
+static int process_msg_map_add(struct rmr_srv_sess *srv_sess,
+			       const void *usr)
+{
+	const struct rmr_msg_io *msg = usr;
+	struct rmr_pool *pool;
+	int i, ret = 0;
+	struct rmr_dirty_id_map *map;
+	u32 group_id = le32_to_cpu(msg->hdr.group_id);
+
+	pr_debug("received map add member_id %u, id (%llu %llu)\n",
+		 msg->member_id, msg->id_a, msg->id_b);
+
+	pool = rmr_srv_sess_get_pool(srv_sess, group_id);
+	if (IS_ERR(pool)) {
+		pr_err_ratelimited("Got I/O request on session %s for unknown pool: %pe\n",
+				   srv_sess->sessname, pool);
+		return PTR_ERR(pool);
+	}
+
+	for (i = 0; i < msg->failed_cnt; i++) {
+		u64 msg_map_ver = le64_to_cpu(msg->map_ver);
+		rmr_id_t id;
+
+		map = rmr_pool_find_map(pool, msg->failed_id[i]);
+		if (!map) {
+			pr_err("no map found for member_id %u\n", msg->failed_id[i]);
+			ret = -EINVAL;
+			goto put_pool;
+		}
+
+		atomic_set(&map->check_state, RMR_MAP_STATE_NO_CHECK);
+		id.a = le64_to_cpu(msg->id_a);
+		id.b = le64_to_cpu(msg->id_b);
+		rmr_map_set_dirty(map, id, 0);
+
+		if (msg_map_ver > pool->map_ver)
+			pool->map_ver = msg_map_ver;
+	}
+	if (msg->failed_cnt) {
+		rmr_srv_mark_pool_md_dirty((struct rmr_srv_pool *)pool->priv);
+		rmr_srv_mark_maps_dirty((struct rmr_srv_pool *)pool->priv);
+	}
+
+put_pool:
+	rmr_srv_sess_put_pool(pool);
+
+	return ret;
+}
+
+/**
+ * rmr_srv_set_pool_mm() - Set the rmr srv pool to maintenance mode
+ *
+ * @srv_pool:	The rmr srv pool to set in maintenance mode
+ *
+ * Description:
+ *	While in maintenance mode, we do not serve IOs either, so we set state to NO_IO
+ *
+ * Return:
+ *	0 on success
+ *	Error value on failure
+ */
+static int rmr_srv_set_pool_mm(struct rmr_srv_pool *srv_pool)
+{
+	srv_pool->maintenance_mode = true;
+
+	return rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+}
+
+/**
+ * rmr_srv_unset_pool_mm() - Clear the rmr srv pool maintenance mode
+ *
+ * @srv_pool:	The rmr srv pool to clear maintenance mode of
+ *
+ * Description:
+ *	While in maintenance mode, we do not serve IOs either, so we set state to NO_IO
+ *
+ * Return:
+ *	0 on success
+ *	Error value on failure
+ */
+static int rmr_srv_unset_pool_mm(struct rmr_srv_pool *srv_pool)
+{
+	srv_pool->maintenance_mode = false;
+	rmr_srv_flush_pool_md(srv_pool);
+
+	return 0;
+}
+
+static int process_msg_enable_pool(struct rmr_pool *pool, struct rmr_srv_sess *sess, bool sync,
+				   const struct rmr_msg_enable_pool_cmd *enable_pool_cmd)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	enum rmr_srv_pool_state old_state = atomic_read(&srv_pool->state);
+	int ret = 0;
+
+	/*
+	 * Enable/Disable messages from sync sessions do not affect us.
+	 */
+	if (sync) {
+		pr_info("%s: From sync sess %s, for pool %s\n", __func__, sess->sessname,
+			pool->poolname);
+		return 0;
+	}
+
+	pr_info("Client %s requests to set enable=%d pool %s current state %s\n",
+		sess->sessname, enable_pool_cmd->enable, pool->poolname,
+		rmr_get_srv_pool_state_name(old_state));
+
+	/*
+	 * Enable when not in maintenance mode, can be handled simply
+	 */
+	if (enable_pool_cmd->enable && !srv_pool->maintenance_mode) {
+		/*
+		 * CREATED -> NORMAL: initial enable after create-mode join.
+		 * NO_IO -> NORMAL: was_last_authoritative recovery (pserver
+		 * enables this node directly without a map update because its
+		 * dirty map is already authoritative).
+		 */
+		if (old_state != RMR_SRV_POOL_STATE_CREATED &&
+		    old_state != RMR_SRV_POOL_STATE_NO_IO) {
+			pr_err("%s: pool %s cannot be enabled in state %s\n",
+			       __func__, pool->poolname,
+			       rmr_get_srv_pool_state_name(old_state));
+			return -EINVAL;
+		}
+
+		ret = rmr_srv_set_pool_state_normal(srv_pool);
+		if (ret < 0)
+			goto out_err;
+
+		return 0;
+	}
+
+	/*
+	 * Any other case involves considering maintenance mode settings
+	 */
+	if (!enable_pool_cmd->enable) {
+		if (old_state != RMR_SRV_POOL_STATE_NORMAL &&
+		    old_state != RMR_SRV_POOL_STATE_NO_IO) {
+			pr_err("%s: pool %s can only disable from NORMAL or NO_IO state (current: %s)\n",
+			       __func__, pool->poolname,
+			       rmr_get_srv_pool_state_name(old_state));
+			return -EINVAL;
+		}
+		ret = rmr_srv_set_pool_mm(srv_pool);
+	} else {
+		ret = rmr_srv_unset_pool_mm(srv_pool);
+	}
+
+	if (ret < 0)
+		goto out_err;
+
+	return 0;
+
+out_err:
+	/*
+	 * Put srv pool state to old one
+	 */
+	atomic_set(&srv_pool->state, old_state);
+	return ret;
+}
+
+/**
+ * process_msg_map_ready() - Process RMR_CMD_MAP_READY command
+ *
+ * @pool:		Pool which received the command
+ * @sync:		Whether the command was sent from an internal (sync) rmr-client or not
+ *
+ * Return:
+ *	0 on success
+ *	Negative errno on failure
+ *
+ * Description:
+ *	A RMR_CMD_MAP_READY command is the first command that is sent to a storage node which will
+ *	receive a map from another storage node as part of a map update.
+ *
+ *	It checks whether this storage node is ready and in an expected state to receive a map.
+ */
+static int process_msg_map_ready(struct rmr_pool *pool, bool sync)
+{
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_dirty_id_map *map;
+	int i, err = 0, pool_state;
+
+	mutex_lock(&srv_pool->srv_pool_lock);
+	pool_state = atomic_read(&srv_pool->state);
+
+	/* A map update from another storage node is not allowed. */
+	if (sync) {
+		pr_err("%s: (sync) Cannot receive map from other storage nodes\n", __func__);
+		err = -EINVAL;
+		goto out;
+	}
+
+	/*
+	 * A map update from pserver should start only when in,
+	 * NO_IO - after a network/IO error
+	 * CREATED - For extend (This is not nice.
+	 *			 Extend should inform the storage node that it is being
+	 *			 used for an extend leg for an already existing node, and
+	 *			 the state should be set accordingly. So that we can allow
+	 *			 this only when in NO_IO state.)
+	 */
+	if (pool_state != RMR_SRV_POOL_STATE_NO_IO && pool_state != RMR_SRV_POOL_STATE_CREATED) {
+		pr_err("(non-sync) pool state not correct %d", pool_state);
+		err = -EINVAL;
+		goto out;
+	}
+
+	/*
+	 * We seem to be in process of another map update.
+	 */
+	if (srv_pool->map_update_state != MAP_UPDATE_STATE_DISABLED) {
+		pr_err("rmr_srv_send_map Map update already in progress\n");
+		err = -EINVAL;
+		goto out;
+	}
+
+	/*
+	 * If pserver is instructing us to receive a map, then the map we
+	 * hold is meaningless.
+	 */
+	mutex_lock(&pool->maps_lock);
+	for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+		map = rcu_dereference_protected(pool->maps[i],
+						lockdep_is_held(&pool->maps_lock));
+		if (!map)
+			continue;
+
+		rmr_map_unset_dirty_all(map);
+	}
+	mutex_unlock(&pool->maps_lock);
+	rmr_srv_mark_maps_dirty(srv_pool);
+
+	srv_pool->map_update_state = MAP_UPDATE_STATE_READY;
+
+	pr_info("%s: process_msg_cmd: moved to MAP_UPDATE_STATE_READY\n", __func__);
+
+out:
+	mutex_unlock(&srv_pool->srv_pool_lock);
+	return err;
+}
+
+/**
+ * process_msg_cmd_handler() - Processes rmr command message
+ *
+ * @work:	scheduled work structure
+ *
+ * Description:
+ *	The command messages being processed here, can be broadly divided into 2 categories.
+ *	Ones which are able to use the rsp buffer to send back status.
+ *	Ones which cannot use the rsp buffer to send back status. These ones use the rsp buffer
+ *	for other purposes; like sending map data, or read user rsp buffer.
+ *
+ * Context:
+ *	Execution time depends on the command. It may take a long time for commands which sends
+ *	data (map).
+ */
+static void process_msg_cmd_handler(struct work_struct *work)
+{
+	struct rmr_cmd_work_info *work_info = container_of(work, struct rmr_cmd_work_info, cmd_work);
+	struct rmr_pool *pool = work_info->pool;
+	struct rmr_srv_pool *srv_pool = (struct rmr_srv_pool *)pool->priv;
+	struct rmr_srv_sess *sess = work_info->sess;
+	struct rtrs_srv_sess *rtrs = work_info->rtrs;
+	const struct rmr_msg_pool_cmd *cmd_msg = work_info->cmd_msg;
+	struct rmr_dirty_id_map *map;
+	u8 sync, flags;
+	u64 src_mapped_size;
+	int md_i, err = 0;
+
+	/*
+	 * The switch cases below are used by either map sending node,
+	 * or the node which is to receive the map, but not both.
+	 */
+	switch (cmd_msg->cmd_type) {
+	case RMR_CMD_REJOIN_POOL:
+		/*
+		 * For now, we do not have any difference between joinand
+		 * rejoin on the storage server side
+		 */
+	case RMR_CMD_JOIN_POOL:
+		/*
+		 * Server node, received a request for a new session
+		 */
+		err = process_msg_join_pool(pool, sess, rtrs, cmd_msg->sync,
+					    &cmd_msg->join_pool_cmd);
+		if (err) {
+			pr_err("process_msg_join_pool failed with err %d\n", err);
+			goto out;
+		}
+		work_info->rsp->join_pool_cmd_rsp.chunk_size = pool->chunk_size;
+
+		if (pool->mapped_size) {
+			work_info->rsp->join_pool_cmd_rsp.mapped_size = pool->mapped_size;
+			pr_info("srv pool %s sets mapped size %llu\n",
+			       pool->poolname, pool->mapped_size);
+		} else
+			work_info->rsp->join_pool_cmd_rsp.mapped_size = 0;
+
+		break;
+	case RMR_CMD_POOL_INFO:
+		/*
+		 * Server node, received pool info command
+		 */
+		err = process_msg_pool_info(pool, &cmd_msg->pool_info_cmd);
+		if (err) {
+			pr_err("process_msg_pool_info failed with err %d\n", err);
+			goto out;
+		}
+
+		break;
+	case RMR_CMD_LEAVE_POOL:
+		err = process_msg_leave_pool(pool, sess, cmd_msg->sync, &cmd_msg->leave_pool_cmd);
+		if (err) {
+			pr_err("process_msg_leave_pool failed with err %d\n", err);
+			goto out;
+		}
+
+		break;
+	case RMR_CMD_ENABLE_POOL:
+		err = process_msg_enable_pool(pool, sess, cmd_msg->sync, &cmd_msg->enable_pool_cmd);
+		if (err) {
+			pr_err("process_msg_enable_pool failed with err %d\n", err);
+			goto out;
+		}
+
+		break;
+	case RMR_CMD_MAP_READY:
+		/*
+		 * Map receiving node.
+		 * Getting ready to receive dirty map
+		 */
+		pr_info("%s: RMR_CMD_MAP_READY\n", __func__);
+
+		err = process_msg_map_ready(pool, cmd_msg->sync);
+		if (err) {
+			pr_err("process_msg_map_ready failed with err %d\n", err);
+			goto out;
+		}
+
+		break;
+	case RMR_CMD_MAP_SEND:
+		/*
+		 * Map sending node.
+		 * Send map to the node with member_id == map_send_cmd->receiver_member_id
+		 */
+		pr_info("%s: RMR_CMD_MAP_SEND\n", __func__);
+
+		err = rmr_clt_send_map(pool, srv_pool->clt, &cmd_msg->map_send_cmd, MAP_NO_FILTER);
+		if (err) {
+			pr_err("rmr_clt_send_map failed with err %d\n", err);
+			goto out;
+		}
+
+		break;
+	case RMR_CMD_SEND_MAP_BUF:
+		/*
+		 * Map receiving node.
+		 * Received the map from another node. Save it.
+		 */
+		pr_info("%s: RMR_CMD_SEND_MAP_BUF\n", __func__);
+
+		if (srv_pool->map_update_state != MAP_UPDATE_STATE_READY) {
+			pr_err("rmr_srv_send_map Node not ready to receive map\n");
+			err = -EINVAL;
+			goto out;
+		}
+
+		err = rmr_pool_save_map(pool, work_info->data, work_info->datalen,
+					false);
+		if (err) {
+			if (!cmd_msg->sync)
+				rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+
+			pr_err("rmr_pool_save_map failed\n");
+			goto out;
+		}
+		break;
+	case RMR_CMD_MAP_BUF_DONE:
+		/*
+		 * Map receiving node.
+		 * A confirmation that all map updates have been sent.
+		 */
+		pr_info("%s: RMR_CMD_MAP_BUF_DONE\n", __func__);
+
+		if (srv_pool->map_update_state != MAP_UPDATE_STATE_READY) {
+			pr_err("rmr_srv_send_map Node state not correct\n");
+			err = -EINVAL;
+			goto out;
+		}
+
+		if (cmd_msg->map_buf_done_cmd.map_version < pool->map_ver) {
+			pr_err("Map version received (%llu) is older than ours (%llu)\n",
+			       cmd_msg->map_buf_done_cmd.map_version, pool->map_ver);
+			err = -EINVAL;
+			goto out;
+		}
+
+		pool->map_ver = cmd_msg->map_buf_done_cmd.map_version;
+		rmr_srv_mark_pool_md_dirty(srv_pool);
+
+		srv_pool->map_update_state = MAP_UPDATE_STATE_DONE;
+
+		break;
+	case RMR_CMD_MAP_DONE:
+		/*
+		 * Map receiving node.
+		 * A confirmation from the client, that map update was done successfully or not.
+		 */
+		pr_info("%s: RMR_CMD_MAP_DONE\n", __func__);
+
+		if (srv_pool->map_update_state != MAP_UPDATE_STATE_DONE) {
+			pr_err("rmr_srv_send_map Map not updated succesfully\n");
+			err = -EINVAL;
+		}
+
+		/*
+		 * On a successful map update, we go to NORMAL state.
+		 *
+		 * map_done_cmd.enable says whether this map update should make us go to
+		 * NORMAL state or not. This is controlled by the pserver.
+		 */
+		if (cmd_msg->map_done_cmd.enable) {
+			if (rmr_srv_set_pool_state_normal(srv_pool) < 0)
+				err = -EINVAL;
+		}
+
+		srv_pool->map_update_state = MAP_UPDATE_STATE_DISABLED;
+		break;
+	case RMR_CMD_MAP_DISABLE:
+		/*
+		 * Something went wrong on the client side; we need to reset everything.
+		 */
+		pr_info("%s: RMR_CMD_MAP_DISABLE\n", __func__);
+
+		if (!cmd_msg->sync)
+			rmr_srv_change_pool_state(srv_pool, RMR_SRV_POOL_STATE_NO_IO);
+
+		srv_pool->map_update_state = MAP_UPDATE_STATE_DISABLED;
+		break;
+	case RMR_CMD_READ_MAP_BUF:
+		/*
+		 * Pserver wants to read our dirty map. So send it.
+		 */
+		pr_info("%s: RMR_CMD_READ_MAP_BUF\n", __func__);
+
+		rmr_srv_read_map_buf(pool, work_info->data, work_info->datalen,
+				     &cmd_msg->map_buf_cmd);
+
+		goto out_no_rsp;
+	case RMR_CMD_MAP_CHECK:
+		pr_debug("%s: RMR_CMD_MAP_CHECK\n", __func__);
+
+		if (atomic_read(&srv_pool->state) != RMR_SRV_POOL_STATE_NORMAL) {
+			pr_debug("srv pool %s is not in normal state, cannot do map check\n",
+				pool->poolname);
+			work_info->rsp->value = false;
+			break;
+		}
+		map = rmr_pool_find_map(pool, srv_pool->member_id);
+		if (!map) {
+			pr_err("pool %s no map found for member_id %u\n",
+			       pool->poolname, srv_pool->member_id);
+			err = -EINVAL;
+			goto out;
+		}
+		work_info->rsp->value = rmr_map_empty(map);
+		pr_debug("pool %s member_id %d rsp with map_empty=%llu\n",
+			 pool->poolname, srv_pool->member_id,
+			 work_info->rsp->value);
+
+		break;
+
+	case RMR_CMD_LAST_IO_TO_MAP:
+		/*
+		 * Use the last_io list, and add those IOs as dirty IDs to the map
+		 * for every other storage server other than this one.
+		 */
+		pr_info("%s: RMR_CMD_LAST_IO_TO_MAP\n", __func__);
+		err = rmr_srv_save_last_io_to_map(pool);
+		if (err) {
+			pr_err("rmr_srv_save_last_io_to_map failed\n");
+			goto out;
+		}
+
+		break;
+
+	case RMR_CMD_MAP_TEST:
+		/*
+		 * Received the map test from another node.
+		 * Check that we have everything that other node has.
+		 */
+		pr_info("%s: RMR_CMD_MAP_TEST\n", __func__);
+
+		err = rmr_pool_save_map(pool, work_info->data, work_info->datalen, true);
+		if (err) {
+			pr_err("rmr_srv_save_map failed, test_only, err %d\n", err);
+		}
+		goto out_no_rsp;
+	case RMR_CMD_MD_SEND:
+		/*
+		 * Received the message to copy metadata of server pool to the sender.
+		 */
+		src_mapped_size = cmd_msg->md_send_cmd.src_mapped_size;
+		pr_debug("stg %u: receives md_update message from pool %u\n",
+			 srv_pool->member_id, cmd_msg->md_send_cmd.leader_id);
+
+		/* Check the pool mapped_sizes are consistent or not */
+		if (pool->mapped_size && src_mapped_size && pool->mapped_size != src_mapped_size) {
+			pr_err_ratelimited("This %s mapped_size %llu != src %d mapped_size %llu\n",
+			       pool->poolname, pool->mapped_size, cmd_msg->md_send_cmd.leader_id,
+			       src_mapped_size);
+			goto out;
+		}
+
+		if (cmd_msg->md_send_cmd.read_full_md) {
+			if (work_info->datalen < sizeof(struct rmr_pool_md)) {
+				pr_err("%s: buffer too small for full pool_md (%zu < %zu)\n",
+				       __func__, work_info->datalen,
+				       sizeof(struct rmr_pool_md));
+				err = -EINVAL;
+				goto out;
+			}
+			memcpy(work_info->data, &pool->pool_md, sizeof(struct rmr_pool_md));
+		} else {
+			/* If updating buf incurs error, it simply waits for next md_update. */
+			rmr_srv_update_md_buf(srv_pool, work_info->data, work_info->datalen);
+		}
+
+		break;
+	case RMR_CMD_SEND_MD_BUF:
+		/*
+		 * Received the client pool metadata. Save it.
+		 */
+		sync = cmd_msg->send_md_buf_cmd.sync;
+		flags = cmd_msg->send_md_buf_cmd.flags;
+		if (flags == RMR_OP_MD_WRITE) {
+			err = rmr_srv_md_process_buf(pool, work_info->data, sync);
+			if (err) {
+				pr_err("rmr_srv_write_md failed\n");
+				goto out;
+			}
+
+			if (atomic_read(&srv_pool->store_state)) {
+				/* write back to disk */
+				err = process_md_io(pool, NULL, 0, work_info->datalen, flags,
+						    &pool->pool_md);
+				if (err) {
+					pr_err("Failed to process md io\n");
+					goto out;
+				}
+			}
+		}
+
+		if (!sync && flags == RMR_OP_MD_READ)
+			memcpy(work_info->data, &pool->pool_md, sizeof(struct rmr_pool_md));
+
+		break;
+	case RMR_CMD_SEND_DISCARD:
+		/* Received the message to handle discards. */
+		pr_info("%s: RMR_CMD_SEND_DISCARD for srv %u\n",
+			__func__, cmd_msg->send_discard_cmd.member_id);
+		if (!cmd_msg->sync) {
+			err = rmr_pool_md_check_discard(pool, cmd_msg->send_discard_cmd.member_id);
+			if (err > 0) {
+				/* This node has received discards. */
+				err = 0;
+				pr_info("pool %s member_id %d has received discards\n",
+					pool->poolname, srv_pool->member_id);
+				goto out;
+			}
+		}
+
+		/*
+		 * For sync requests, even if the server that is not in normal state has received
+		 * the discard request, its dirty map is still outdated. However, non-sync
+		 * requests can overlook this check and proceed discarding directly.
+		 */
+		if (cmd_msg->sync && atomic_read(&srv_pool->state) != RMR_SRV_POOL_STATE_NORMAL){
+			pr_err("srv pool %s not in normal state for sync discard request\n",
+				pool->poolname);
+			err = -EINVAL;
+			goto out;
+		}
+
+		err = rmr_srv_discard_id(pool, 0, 0, cmd_msg->send_discard_cmd.member_id,
+				cmd_msg->sync);
+		if (err)
+			pr_err("Failed to discard id\n");
+
+		break;
+	case RMR_CMD_STORE_CHECK:
+		pr_debug("%s: RMR_CMD_STORE_CHECK\n", __func__);
+
+		work_info->rsp->value = rmr_srv_pool_check_store(pool);
+		pr_debug("pool %s member_id %d rsp with value=%llu\n",
+			 pool->poolname, srv_pool->member_id,
+			 work_info->rsp->value);
+
+		break;
+	case RMR_CMD_MAP_GET_VER:
+		pr_debug("%s: RMR_CMD_MAP_GET_VER\n", __func__);
+
+		work_info->rsp->value = pool->map_ver;
+		pr_debug("pool %s member_id %d rsp with value=%llu\n",
+			 pool->poolname, srv_pool->member_id,
+			 work_info->rsp->value);
+
+		break;
+	case RMR_CMD_MAP_SET_VER:
+		pr_debug("%s: RMR_CMD_MAP_SET_VER\n", __func__);
+
+		pool->map_ver = work_info->cmd_msg->set_map_ver_cmd.map_ver;
+		rmr_srv_mark_pool_md_dirty(srv_pool);
+		break;
+	case RMR_CMD_DISCARD_CLEAR_FLAG:
+		pr_info("%s: RMR_CMD_DISCARD_CLEAR_FLAG\n", __func__);
+
+		md_i = rmr_pool_find_md(&pool->pool_md, cmd_msg->send_discard_cmd.member_id, false);
+		if (md_i < 0) {
+			pr_info("Didn't find md for member_id %u\n",
+				cmd_msg->send_discard_cmd.member_id);
+			goto out;
+		}
+
+		pool->pool_md.srv_md[md_i].discard_entries = false;
+		rmr_srv_flush_pool_md(srv_pool);
+		break;
+	case RMR_CMD_USER:
+		pr_debug("%s: RMR_CMD_USER\n", __func__);
+
+		err = process_msg_user_cmd(srv_pool, cmd_msg, work_info->data, work_info->datalen);
+		if (err) {
+			pr_err("process_msg_user_cmd failed with err %d\n", err);
+			goto out_no_rsp;
+		}
+
+		goto out_no_rsp;
+	default:
+		pr_warn("%s: switch default type: %d\n", __func__, cmd_msg->cmd_type);
+
+		err = -EINVAL;
+	}
+
+out:
+	work_info->rsp->err = err;
+	work_info->rsp->member_id = srv_pool->member_id;
+	work_info->rsp->cmd_type = cmd_msg->cmd_type;
+
+out_no_rsp:
+	// Should we return err in rdma_resp ?
+	pr_debug("send rtrs completion from msg_cmd_handler, err:%d\n", err);
+	rtrs_srv_resp_rdma(work_info->rtrs_op, err);
+
+	rmr_put_srv_pool(srv_pool);
+	kfree(work_info);
+}
+
+static int schedule_process_msg_cmd(struct rmr_srv_sess *srv_sess,
+				    struct rtrs_srv_op *rtrs_op,
+				    void *data, size_t datalen,
+				    const void *msg, size_t len)
+{
+	struct rmr_srv_pool *srv_pool;
+	const struct rmr_msg_pool_cmd *cmd_msg = msg;
+	const char *poolname = cmd_msg->pool_name;
+	struct rmr_cmd_work_info *work_info;
+	u32 group_id = le32_to_cpu(cmd_msg->hdr.group_id);
+
+	pr_debug("pool %s received cmd %d\n",
+		 poolname, cmd_msg->cmd_type);
+
+	srv_pool = rmr_find_and_get_srv_pool(group_id);
+	if (IS_ERR(srv_pool)) {
+		pr_err("Cmd %s: pool %s does not exists: %pe\n",
+                        rmr_get_cmd_name(cmd_msg->cmd_type), poolname, srv_pool);
+		return PTR_ERR(srv_pool);
+	}
+
+	pr_debug("process_msg_cmd: pool %s found\n", poolname);
+
+	work_info = kzalloc(sizeof(struct rmr_cmd_work_info), GFP_KERNEL);
+	if (!work_info) {
+		pr_err("failed to allocate work info to send map\n");
+		rmr_put_srv_pool(srv_pool);
+		return -ENOMEM;
+	}
+	work_info->pool = srv_pool->pool;
+	work_info->sess = srv_sess;
+	work_info->rtrs = srv_sess->rtrs;
+	work_info->rtrs_op = rtrs_op;
+	work_info->cmd_msg = cmd_msg;
+	work_info->rsp = data;
+	work_info->data = data;
+	work_info->datalen = datalen;
+
+	INIT_WORK(&work_info->cmd_work, process_msg_cmd_handler);
+	schedule_work(&work_info->cmd_work);
+
+	return 0;
+}
+
+static int rmr_srv_rdma_ev(void *priv, struct rtrs_srv_op *id,
+			   void *data, size_t datalen,
+			   const void *usr, size_t usrlen)
+{
+	struct rmr_srv_sess *srv_sess = priv;
+	const struct rmr_msg_hdr *hdr = usr;
+	int ret = 0;
+	u16 type;
+
+	if (unlikely(WARN_ON(!srv_sess)))
+		return -ENODEV;
+
+	type = le16_to_cpu(hdr->type);
+
+	switch (type) {
+	case RMR_MSG_IO:
+		return process_msg_io(srv_sess, id, data, datalen,
+				      usr, usrlen);
+	case RMR_MSG_MAP_CLEAR:
+		ret = process_msg_map_clear(srv_sess, usr);
+		break;
+	case RMR_MSG_MAP_ADD:
+		ret = process_msg_map_add(srv_sess, usr);
+		break;
+	case RMR_MSG_CMD:
+		return schedule_process_msg_cmd(srv_sess, id, data, datalen,
+						usr, usrlen);
+	default:
+		pr_warn("Received unexpected message type %d from session %s\n",
+			type, srv_sess->sessname);
+		return -EINVAL;
+	}
+
+	rtrs_srv_resp_rdma(id, ret);
+
+	return 0;
+}
+
+/**
+ * rmr_srv_check_params() - Check the parameters of the storage node
+ *
+ * @srv_pool:	The rmr srv pool to check parameters for
+ *
+ * Description:
+ *	Checks the device params with other connected server nodes.
+ *
+ * Return:
+ *	0 on success.
+ *	-Negative error code on failure.
+ */
+int rmr_srv_check_params(struct rmr_srv_pool *srv_pool)
+{
+	void *dev;
+	int err;
+
+	/* If the store has not been added to this server pool, ignore device param checks. */
+	if (!srv_pool->io_store)
+		return 0;
+
+	dev = srv_pool->io_store->priv;
+	err = srv_pool->io_store->ops->get_params(dev);
+	if (err) {
+		pr_err("%s: store get_params failed for pool %s, err %d\n",
+		       __func__, srv_pool->pool->poolname, err);
+		return err;
+	}
+	return 0;
+}
+EXPORT_SYMBOL(rmr_srv_check_params);
+
+static struct rtrs_srv_ops rtrs_ops;
+static int __init rmr_srv_init_module(void)
+{
+	int err;
+
+	if (!is_power_of_2(chunk_size) ||
+	    chunk_size < MIN_CHUNK_SIZE || chunk_size > MAX_CHUNK_SIZE) {
+		pr_err("Loading module %s failed. Invalid chunk_size %u\n",
+			KBUILD_MODNAME, chunk_size);
+		pr_err("Chunk size should be a power of 2, and between (min %u - max %u)\n",
+			MIN_CHUNK_SIZE, MAX_CHUNK_SIZE);
+		return -EINVAL;
+	}
+
+	pr_info("Loading module %s, version %s, proto %s, chunk_size %u\n",
+		KBUILD_MODNAME, RMR_VER_STRING, RMR_PROTO_VER_STRING, chunk_size);
+
+	rtrs_ops = (struct rtrs_srv_ops){
+		.rdma_ev = rmr_srv_rdma_ev,
+		.link_ev = rmr_srv_link_ev,
+	};
+
+	rmr_req_cachep = kmem_cache_create("rmr_req_cachep", sizeof(struct rmr_srv_req),
+					   0, 0, NULL);
+	if (!rmr_req_cachep) {
+		pr_err("can not allocagte cachep for rmr_req\n");
+		err = -ENOMEM;
+		goto out;
+	}
+	rmr_map_entry_cachep = kmem_cache_create("rmr_map_entry_cachep",
+						 sizeof(struct rmr_map_entry),
+						 0, 0, NULL);
+	if (!rmr_map_entry_cachep) {
+		pr_err("can not allocagte cachep for rmr_map_entry\n");
+		err = -ENOMEM;
+		goto req_destroy;
+	}
+
+	BUILD_BUG_ON(PAGE_SIZE / sizeof(struct rmr_map_cbuf_hdr) < RMR_POOL_MAX_SESS);
+
+	rtrs_ctx = rtrs_srv_open(&rtrs_ops, RTRS_PORT);
+	if (IS_ERR(rtrs_ctx)) {
+		err = PTR_ERR(rtrs_ctx);
+		pr_err("rtrs_srv_open(), err: %pe\n", rtrs_ctx);
+		goto map_destroy;
+	}
+
+	err = rmr_srv_create_sysfs_files();
+	if (err) {
+		pr_err("rmr_srv_create_sysfs_files(), err: %d\n", err);
+		goto srv_close;
+	}
+
+	return 0;
+
+srv_close:
+	rtrs_srv_close(rtrs_ctx);
+map_destroy:
+	kmem_cache_destroy(rmr_map_entry_cachep);
+req_destroy:
+	kmem_cache_destroy(rmr_req_cachep);
+out:
+	return err;
+}
+
+static void __exit rmr_srv_cleanup_module(void)
+{
+	struct rmr_pool *pool, *tmp;
+	struct rmr_srv_pool *srv_pool;
+
+	pr_info("Unloading module\n");
+	kmem_cache_destroy(rmr_req_cachep);
+
+	rtrs_srv_close(rtrs_ctx);
+
+	list_for_each_entry_safe (pool, tmp, &pool_list, entry) {
+		srv_pool = (struct rmr_srv_pool *)pool->priv;
+
+		WARN_ON(!list_empty(&pool->sess_list));
+		rmr_srv_destroy_pool(pool);
+		rmr_srv_destroy_pool_sysfs_files(pool, NULL);
+		rmr_put_srv_pool(srv_pool);
+	}
+
+	rmr_srv_destroy_sysfs_files();
+	pr_info("Module unloaded\n");
+}
+
+module_init(rmr_srv_init_module);
+module_exit(rmr_srv_cleanup_module);
-- 
2.43.0


  parent reply	other threads:[~2026-05-05  7:47 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-05  7:46 [LSF/MM/BPF RFC PATCH 00/13] Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 01/13] RDMA/rmr: add public and private headers Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 02/13] RDMA/rmr: add shared library code (pool, map, request) Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 03/13] RDMA/rmr: client: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 04/13] RDMA/rmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` Md Haris Iqbal [this message]
2026-05-05  7:46 ` [PATCH 06/13] RDMA/rmr: server: " Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 07/13] RDMA/rmr: include client and server modules into kernel compilation Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 08/13] block/brmr: add private headers with brmr protocol structs and helpers Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 09/13] block/brmr: client: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 10/13] block/brmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 11/13] block/brmr: server: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 12/13] block/brmr: server: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 13/13] block/brmr: include client and server modules into kernel compilation Md Haris Iqbal

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260505074644.195453-6-haris.iqbal@ionos.com \
    --to=haris.iqbal@ionos.com \
    --cc=axboe@kernel.dk \
    --cc=bvanassche@acm.org \
    --cc=hch@lst.de \
    --cc=jgg@ziepe.ca \
    --cc=jia.li@ionos.com \
    --cc=jinpu.wang@ionos.com \
    --cc=leon@kernel.org \
    --cc=linux-block@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=linux-rdma@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox