Linux RDMA and InfiniBand development
 help / color / mirror / Atom feed
From: Md Haris Iqbal <haris.iqbal@ionos.com>
To: linux-block@vger.kernel.org, linux-rdma@vger.kernel.org
Cc: linux-kernel@vger.kernel.org, axboe@kernel.dk,
	bvanassche@acm.org, hch@lst.de, jgg@ziepe.ca, leon@kernel.org,
	jinpu.wang@ionos.com, Md Haris Iqbal <haris.iqbal@ionos.com>,
	Jia Li <jia.li@ionos.com>
Subject: [PATCH 02/13] RDMA/rmr: add shared library code (pool, map, request)
Date: Tue,  5 May 2026 09:46:14 +0200	[thread overview]
Message-ID: <20260505074644.195453-3-haris.iqbal@ionos.com> (raw)
In-Reply-To: <20260505074644.195453-1-haris.iqbal@ionos.com>

Add the three source files that provide functionality shared by both
the RMR client and the RMR server:

  rmr-pool.c	pool refcounting, lookup and lifecycle helpers used
		by both client and server pool implementations.
  rmr-map.c	dirty-map data structure used to track which blocks
		have not yet been replicated to a given pool member.
  rmr-req.c	server-side request infrastructure that submits an
		I/O to an upper-layer store via struct
		rmr_srv_store_ops and propagates the completion back
		into RMR.

These files are not compiled until the modules are wired into the
build in a later patch in this series.

Signed-off-by: Md Haris Iqbal <haris.iqbal@ionos.com>
Signed-off-by: Jia Li <jia.li@ionos.com>
---
 drivers/infiniband/ulp/rmr/rmr-map.c  | 904 ++++++++++++++++++++++++++
 drivers/infiniband/ulp/rmr/rmr-pool.c | 401 ++++++++++++
 drivers/infiniband/ulp/rmr/rmr-req.c  | 796 +++++++++++++++++++++++
 3 files changed, 2101 insertions(+)
 create mode 100644 drivers/infiniband/ulp/rmr/rmr-map.c
 create mode 100644 drivers/infiniband/ulp/rmr/rmr-pool.c
 create mode 100644 drivers/infiniband/ulp/rmr/rmr-req.c

diff --git a/drivers/infiniband/ulp/rmr/rmr-map.c b/drivers/infiniband/ulp/rmr/rmr-map.c
new file mode 100644
index 000000000000..f4b7dd7c3b50
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-map.c
@@ -0,0 +1,904 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#include <linux/slab.h>
+
+#include "rmr-map.h"
+#include "rmr-pool.h"
+
+void rmr_map_update_page_params(struct rmr_dirty_id_map *map)
+{
+	unsigned long remaining_chunks;
+
+	map->no_of_flp = (map->no_of_chunks >> CHUNKS_PER_FLP_LOG2);
+
+	/*
+	 * If the number of chunks are not completely filling an FLP (CHUNKS_PER_FLP),
+	 * then the remaining would be tracked by the next FLP. Thus the next FLP would
+	 * have unused SLP pointers. We will calculate the number of SLP slots which will
+	 * be used in the last FLP.
+	 */
+	remaining_chunks = map->no_of_chunks & (CHUNKS_PER_FLP - 1);
+	if (!remaining_chunks) {
+		/*
+		 * If there are no remaining chunks, then the last FLP is completely full.
+		 */
+		map->no_of_slp_in_last_flp = NO_OF_SLP_PER_FLP;
+		map->no_of_chunk_in_last_slp = NO_OF_CHUNKS_PER_PAGE;
+	} else {
+		/*
+		 * If there are remaining chunks, then we add another FLP for it.
+		 * This FLP will not be full, hence we calculate the number of SLP slots
+		 * that will be used.
+		 */
+		map->no_of_flp += 1;
+		map->no_of_slp_in_last_flp = (remaining_chunks >> CHUNKS_PER_SLP_LOG2);
+
+		/*
+		 * Same as above. It could be that the number of chunks do not fit neatly
+		 * in the last SLP (CHUNKS_PER_SLP), and the remaining ones end up in the
+		 * SLP with remaining chunk slots.
+		 */
+		remaining_chunks &= (CHUNKS_PER_SLP - 1);
+		if (!remaining_chunks) {
+			/*
+			 * If there are no remaining chunks, then the last SLP is completely full.
+			 */
+			map->no_of_chunk_in_last_slp = CHUNKS_PER_SLP;
+		} else {
+			/*
+			 * If there are remaining chunks, then we add another SLP.
+			 */
+			map->no_of_slp_in_last_flp += 1;
+			map->no_of_chunk_in_last_slp = remaining_chunks;
+		}
+	}
+
+	map->total_slp = ((map->no_of_flp - 1) * NO_OF_SLP_PER_FLP) + map->no_of_slp_in_last_flp;
+}
+
+static void rmr_map_update_map_params(struct rmr_pool *pool, struct rmr_dirty_id_map *map)
+{
+	map->no_of_chunks = pool->no_of_chunks;
+
+	rmr_map_update_page_params(map);
+
+	pr_info("%s: Chunks info %u, %u, %u, %llu\n",
+		__func__, pool->chunk_size, ilog2(pool->chunk_size),
+		pool->chunk_size_shift, map->no_of_chunks);
+	pr_info("%s: FLPs %llu, SLPs in last FLP %llu, Total SLPs %llu, chunks in last SLP %llu\n",
+		__func__, map->no_of_flp, map->no_of_slp_in_last_flp, map->total_slp,
+		map->no_of_chunk_in_last_slp);
+	pr_info("%s: Dirty map size %lldB\n", __func__, (map->total_slp * PAGE_SIZE));
+}
+
+static int rmr_map_allocate_pages(struct rmr_pool *pool, struct rmr_dirty_id_map *map)
+{
+	el_flp *flp_ptr;
+	u64 no_of_slps;
+	int i, j;
+
+	for (i = 0; i < map->no_of_flp;) {
+		map->dirty_bitmap[i] = (void *)get_zeroed_page(GFP_KERNEL);
+		if (!map->dirty_bitmap[i])
+			goto err_alloc;
+		flp_ptr = (el_flp *)map->dirty_bitmap[i];
+
+		if (i == (map->no_of_flp - 1))
+			no_of_slps = map->no_of_slp_in_last_flp;
+		else
+			no_of_slps = NO_OF_SLP_PER_FLP;
+
+		/*
+		 * Move the increment to here, so that later in err_alloc: if we have to free,
+		 * the index i, is pointing in the correct position.
+		 */
+		i++;
+
+		for (j = 0; j < no_of_slps; j++, flp_ptr++) {
+			*flp_ptr = get_zeroed_page(GFP_KERNEL);
+			if (!*flp_ptr)
+				goto err_alloc;
+		}
+	}
+
+	// TODO remove this
+	map->bitmap_filter = kcalloc(pool->no_of_chunks, sizeof(*map->bitmap_filter), GFP_KERNEL);
+	if (!map->bitmap_filter)
+		goto err_alloc;
+
+	return 0;
+
+err_alloc:
+	for (--i; i >= 0; i--) {
+		flp_ptr = (el_flp *)map->dirty_bitmap[i];
+
+		for (--j; j >= 0; j--)
+			free_page((unsigned long)*(flp_ptr + j));
+
+		j = NO_OF_SLP_PER_FLP;
+		free_page((unsigned long)map->dirty_bitmap[i]);
+	}
+
+	return -ENOMEM;
+}
+
+struct rmr_dirty_id_map *rmr_map_create(struct rmr_pool *pool, u8 member_id)
+{
+	struct rmr_dirty_id_map *map = NULL;
+	int ret;
+
+	pr_info("%s: Creating map for member_id %u, in pool %s. Existing map_cnt %u\n",
+		__func__, member_id, pool->poolname, pool->maps_cnt);
+
+	if (!pool->no_of_chunks) {
+		pr_err("%s: dirty map size cannot be zero\n", __func__);
+		return ERR_PTR(-EINVAL);
+	}
+
+	mutex_lock(&pool->maps_lock);
+
+	/*
+	 * Don't create if already exists
+	 */
+	map = rmr_pool_find_map(pool, member_id);
+	if (map != NULL) {
+		pr_err("Map with member_id %u already exists\n", member_id);
+		ret = -EEXIST;
+		goto err_unlock;
+	}
+
+	if (pool->maps_cnt >= RMR_POOL_MAX_SESS) {
+		pr_err("pool %s can not create new map, max number of sessions %d achieved\n",
+		       pool->poolname, RMR_POOL_MAX_SESS);
+		ret = -EINVAL;
+		goto err_unlock;
+	}
+
+	/*
+	 * Allocate memory and init the structure
+	 */
+	map = (struct rmr_dirty_id_map *)get_zeroed_page(GFP_KERNEL);
+	if (!map) {
+		pr_err("cannot allocate map for member_id %u\n", member_id);
+		ret = -ENOMEM;
+		goto err_unlock;
+	}
+	rmr_map_update_map_params(pool, map);
+
+	ret = rmr_map_allocate_pages(pool, map);
+	if (ret) {
+		pr_err("cannot allocate memory for member_id %u\n", member_id);
+		goto err_map;
+	}
+
+	xa_init_flags(&map->rmr_id_map, XA_FLAGS_ALLOC);
+	map->member_id = member_id;
+	map->ts = jiffies;
+
+	rmr_pool_maps_append(pool, map);
+
+	mutex_unlock(&pool->maps_lock);
+
+	return map;
+
+err_map:
+	free_page((unsigned long)map);
+err_unlock:
+	mutex_unlock(&pool->maps_lock);
+	return ERR_PTR(ret);
+}
+
+void rmr_map_destroy(struct rmr_dirty_id_map *map)
+{
+	el_flp *flp_ptr;
+	int i, j;
+	u64 no_of_slps;
+
+	WARN_ON(!xa_empty(&map->rmr_id_map));
+	map->ts = jiffies;
+
+	pr_info("%s: member_id %u\n", __func__, map->member_id);
+	kfree(map->bitmap_filter);
+
+	for (i = 0; i < map->no_of_flp; i++) {
+		flp_ptr = (el_flp *)map->dirty_bitmap[i];
+
+		if (i == (map->no_of_flp - 1))
+			no_of_slps = map->no_of_slp_in_last_flp;
+		else
+			no_of_slps = NO_OF_SLP_PER_FLP;
+
+		for (j = 0; j < no_of_slps; j++)
+			free_page((unsigned long)*(flp_ptr + j));
+
+		free_page((unsigned long)map->dirty_bitmap[i]);
+	}
+
+	free_page((unsigned long)map);
+}
+
+/**
+ * rmr_map_calc_chunk -	Calculate chunk number from offset and length of IO
+ *
+ * @pool:		The pool
+ * @offset:		Offset of the IO
+ * @length:		Length of the IO
+ * @id:			rmr_id_t where to populate the chunk details
+ *			id.b: chunk number denoted by this entry
+ *			id.a: Number of chunks dirty starting (and including) id.b
+ *
+ *			For example:
+ *			if id.a is 1, only id.b is dirty.
+ *			if id.a is 2, id.b and (id.b+1) is dirty
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+void rmr_map_calc_chunk(struct rmr_pool *pool, size_t offset, size_t length, rmr_id_t *id)
+{
+	u64 off_len = offset + length;
+
+	id->b = GET_CHUNK_NUMBER(offset, pool->chunk_size_shift);
+	id->a = GET_FOLLOWING_CHUNKS(off_len, pool->chunk_size_shift, id->b);
+}
+
+/**
+ * rmr_get_chunk_md_from_id - Get the chunk metadata byte from rmr_id_t
+ *
+ * @map:	The map to work on
+ * @id:		rmr_id_t to use to get the chunk metadata byte
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline u8 *rmr_get_chunk_md_from_id(struct rmr_dirty_id_map *map, rmr_id_t id)
+{
+	unsigned long idb_slp, idb_slp_index, idb_chunk;
+	el_flp *flp_ptr;
+	u8 *slp, *chunk_md;
+
+	/*
+	 * First get the pointer to first level page (FLP).
+	 * To get that, we need to find which first level page the chunk belongs, and it can
+	 * be found by dividing the chunk number by the maximum number of chunks 1 FLP can track.
+	 *
+	 * After that we need to adjust the id.b to go one level down. This is because we just
+	 * moved to the desired FLP, and hence that portion of id.b can be dropped.
+	 * For this we do the modulo with CHUNKS_PER_FLP.
+	 */
+	flp_ptr = (el_flp *)(map->dirty_bitmap[id.b >> CHUNKS_PER_FLP_LOG2]);
+	idb_slp = id.b & (CHUNKS_PER_FLP - 1);
+
+	/*
+	 * Now we need to move to the second level page (SLP).
+	 * The addresses to SLPs are stored in the FLP as a list of addresses. Hence we calculate
+	 * the desired slp index which has the address to the SLP our chunk md resides in.
+	 *
+	 * We then adjust our flp_ptr according to the index.
+	 * Note that flp_ptr is of type el_flp (flp element), which is unsigned long, since
+	 * addresses are of that data type. This lets us move to the slp index easily.
+	 */
+	idb_slp_index = idb_slp >> CHUNKS_PER_SLP_LOG2;
+	flp_ptr += idb_slp_index;
+
+	/*
+	 * The location pointed by flp_ptr is storing the address to the SLP we want to move to.
+	 * So we dereference it first, and then cast it to relevant pointer (to the chunk metadata
+	 * data type, which is u8).
+	 *
+	 * The last step it to move to the correct chunk metadata in the SLP.
+	 *
+	 * Each SLP can store metadata for CHUNKS_PER_SLP chunks. So we adjust the idb_slp
+	 * accordingly. And then move our slp pointer to the correct chunk metadata byte.
+	 */
+	slp = (u8 *)(*flp_ptr);
+	idb_chunk = idb_slp & (CHUNKS_PER_SLP - 1);
+	chunk_md = slp + idb_chunk;
+
+	return chunk_md;
+}
+
+static bool rmr_chunk_md_check_dirty(u8 *chunk_md)
+{
+	return (*chunk_md) & (0x1 << CHUNK_DIRTY_BIT);
+}
+
+static void rmr_chunk_md_set_dirty(u8 *chunk_md)
+{
+	*chunk_md |= (0x1 << CHUNK_DIRTY_BIT);
+}
+
+static void rmr_chunk_md_unset_dirty(u8 *chunk_md)
+{
+	*chunk_md &= ~(0x1 << CHUNK_DIRTY_BIT);
+}
+
+/**
+ * rmr_map_set_dirty -	Set bits from rmr_id_t
+ *
+ * @map:		Map to work on
+ * @id:			rmr_id_t containing the chunk info
+ *			id.b: chunk number denoted by this entry
+ *			id.a: Number of chunks dirty starting (and including) id.b
+ * @filter:		Filter to add to entry
+ *
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline void rmr_map_set_dirty(struct rmr_dirty_id_map *map, rmr_id_t id, u8 filter)
+{
+	u8 *chunk_md;
+	u64 i;
+
+	map->ts = jiffies;
+
+	chunk_md = rmr_get_chunk_md_from_id(map, id);
+	for (i = 0; i < id.a; i++) {
+		rmr_chunk_md_set_dirty(chunk_md);
+		chunk_md++;
+	}
+}
+
+inline void rmr_map_set_dirty_all(struct rmr_dirty_id_map *map, u8 filter)
+{
+	el_flp *flp_ptr;
+	u64 no_of_slps, no_of_chunks;
+	bool is_last_flp;
+	u8 *slp;
+	int i, j, k;
+
+	for (i = 0; i < map->no_of_flp; i++) {
+		flp_ptr = (el_flp *)map->dirty_bitmap[i];
+		is_last_flp = (i == (map->no_of_flp - 1));
+
+		if (is_last_flp)
+			no_of_slps = map->no_of_slp_in_last_flp;
+		else
+			no_of_slps = NO_OF_SLP_PER_FLP;
+
+		for (j = 0; j < no_of_slps; j++, flp_ptr++) {
+			slp = (u8 *)(*flp_ptr);
+
+			if (is_last_flp && j == (no_of_slps - 1))
+				no_of_chunks = map->no_of_chunk_in_last_slp;
+			else
+				no_of_chunks = NO_OF_CHUNKS_PER_PAGE;
+
+			for (k = 0; k < no_of_chunks; k++, slp++)
+				rmr_chunk_md_set_dirty(slp);
+		}
+	}
+}
+
+/**
+ * rmr_map_unset_dirty - Clear bits from rmr_id_t, and free entry if any
+ *
+ * @map:		Map to work on
+ * @id:			rmr_id_t containing the chunk info
+ *			id.b: chunk number denoted by this entry
+ *			id.a: Number of chunks dirty starting (and including) id.b
+ * @filter:		Filter to add to entry
+ *
+ * Description:
+ *	This version can be used by both client and server.
+ *	If entry is found, the function frees it.
+ *	Clears the bit using info from the given rmr_id_t
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline struct rmr_map_entry *rmr_map_unset_dirty(struct rmr_dirty_id_map *map, rmr_id_t id,
+						 u8 filter)
+{
+	struct rmr_map_entry *entry;
+	u8 *chunk_md;
+	u64 i;
+
+	map->ts = jiffies;
+
+	chunk_md = rmr_get_chunk_md_from_id(map, id);
+	BUG_ON(!chunk_md);
+	for (i = 0; i < id.a; i++) {
+		rmr_chunk_md_unset_dirty(chunk_md);
+		chunk_md++;
+	}
+
+	entry = xa_erase(&map->rmr_id_map, rmr_id_to_key(id));
+	if (!entry) {
+		pr_debug("in the member_id %d there is no entry for id [%llu, %llu]\n",
+			 map->member_id, id.a, id.b);
+	}
+
+	return entry;
+}
+
+/*
+ * rmr_map_check_dirty - Check if the following bits are set or not
+ *
+ * @map:		Map to work on
+ * @id:			rmr_id_t containing the chunk info
+ *			id.b: chunk number denoted by this entry
+ *			id.a: Number of chunks dirty starting (and including) id.b
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline bool rmr_map_check_dirty(struct rmr_dirty_id_map *map, rmr_id_t id)
+{
+	u8 *chunk_md;
+
+	chunk_md = rmr_get_chunk_md_from_id(map, id);
+	return rmr_chunk_md_check_dirty(chunk_md);
+}
+
+/**
+ * rmr_map_get_dirty_entry - Check and return entry if the following bits are set
+ *
+ * @map:		Map to work on
+ * @id:			rmr_id_t containing the chunk info
+ *			id.b: chunk number denoted by this entry
+ *			id.a: Number of chunks dirty starting (and including) id.b
+ *
+ * Description:
+ *	Check if a chunk is dirty or not.
+ *	If the particular chunk is dirty, then create an entry for it and return back.
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline struct rmr_map_entry *rmr_map_get_dirty_entry(struct rmr_dirty_id_map *map, rmr_id_t id)
+{
+	struct rmr_map_entry *entry;
+	int err;
+
+	if (rmr_map_check_dirty(map, id)) {
+		entry = xa_load(&map->rmr_id_map, rmr_id_to_key(id));
+		if (entry) {
+			pr_debug("%s: For id [%llu, %llu], entry exists member_id %u\n",
+				 __func__, id.a, id.b, map->member_id);
+			return entry;
+		}
+
+		entry = kmem_cache_zalloc(rmr_map_entry_cachep, GFP_KERNEL);
+		if (!entry) {
+			pr_err("%s: Cannot allocate entry for member_id %d, id [[%llu, %llu]]\n",
+			       __func__, map->member_id, id.a, id.b);
+			return ERR_PTR(-ENOMEM);
+		}
+
+		atomic_set(&entry->sync_cnt, -1);
+		init_llist_head(&entry->wait_list);
+
+		err = xa_insert(&map->rmr_id_map, rmr_id_to_key(id), entry, GFP_KERNEL);
+		if (err == 0)
+			return entry;
+
+		kmem_cache_free(rmr_map_entry_cachep, entry);
+
+		if (err == -EBUSY)
+			return xa_load(&map->rmr_id_map, rmr_id_to_key(id));
+		else
+			return ERR_PTR(-ENOMEM);
+	}
+
+	return NULL;
+}
+
+/**
+ * rmr_map_clear_filter_all - Clear filter for entire bitmap
+ *
+ * @map:       Map to work on
+ * @filter:    Filter to be cleared
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline void rmr_map_clear_filter_all(struct rmr_dirty_id_map *map, u8 filter)
+{
+	u64 i;
+
+	for (i = 0; i < map->no_of_chunks; i++)
+		map->bitmap_filter[i] &= ~filter;
+}
+
+/**
+ * rmr_map_unset_dirty_all - Clear all chunk bits (the entire map)
+ *
+ * @map:       Map to work on
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline void rmr_map_unset_dirty_all(struct rmr_dirty_id_map *map)
+{
+	rmr_id_t id;
+	u64 i;
+
+	/*
+	 * TODO: memcpy zeroes or something faster
+	 */
+
+	id.a = 1;
+	for (i = 0; i < map->no_of_chunks; i++) {
+		id.b = i;
+
+		if (!rmr_map_check_dirty(map, id))
+			continue;
+
+		rmr_map_unset_dirty(map, id, MAP_NO_FILTER);
+	}
+
+	rmr_map_clear_filter_all(map, MAP_ENTRY_UNSYNCED);
+}
+
+/**
+ * rmr_map_empty - Check if there are any chunks dirty
+ *
+ * @map:       Map to work on
+ *
+ * Return:
+ *	True:	If map is empty
+ *	False:	Otherwise
+ *
+ * Context:
+ *	srcu pool->map_srcu should be held while calling this function.
+ */
+inline bool rmr_map_empty(struct rmr_dirty_id_map *map)
+{
+	el_flp *flp_ptr;
+	u64 no_of_slps, no_of_chunks;
+	bool is_last_flp;
+	u8 *slp;
+	int i, j, k;
+
+	for (i = 0; i < map->no_of_flp; i++) {
+		flp_ptr = (el_flp *)map->dirty_bitmap[i];
+		is_last_flp = (i == (map->no_of_flp - 1));
+
+		if (is_last_flp)
+			no_of_slps = map->no_of_slp_in_last_flp;
+		else
+			no_of_slps = NO_OF_SLP_PER_FLP;
+
+		for (j = 0; j < no_of_slps; j++, flp_ptr++) {
+			slp = (u8 *)(*flp_ptr);
+
+			if (is_last_flp && j == (no_of_slps - 1))
+				no_of_chunks = map->no_of_chunk_in_last_slp;
+			else
+				no_of_chunks = NO_OF_CHUNKS_PER_PAGE;
+
+			for (k = 0; k < no_of_chunks; k++, slp++) {
+				if (rmr_chunk_md_check_dirty(slp))
+					return false;
+			}
+		}
+	}
+
+	return true;
+}
+
+inline void rmr_map_bitwise_or_buf(void *dst_buf, void *src_buf, u32 buf_size)
+{
+	u8 *src_byte, *dst_byte;
+
+	src_byte = src_buf;
+	dst_byte = dst_buf;
+
+	while (buf_size--)
+		*(dst_byte + buf_size) |= *(src_byte + buf_size);
+}
+
+inline int rmr_map_create_entries(struct rmr_dirty_id_map *map)
+{
+	struct rmr_map_entry *entry;
+	rmr_id_t id;
+	int err;
+	u64 i;
+
+	id.a = 1;
+	for (i = 0; i < map->no_of_chunks; i++) {
+		id.b = i;
+
+		if (!rmr_map_check_dirty(map, id))
+			continue;
+
+		if (xa_load(&map->rmr_id_map, rmr_id_to_key(id)))
+			continue;
+
+		entry = kmem_cache_zalloc(rmr_map_entry_cachep, GFP_KERNEL);
+		if (!entry) {
+			pr_err("%s: Cannot allocate entry for member_id %d, chunk %llu\n",
+			       __func__, map->member_id, i);
+			return -ENOMEM;
+		}
+
+		atomic_set(&entry->sync_cnt, -1);
+		init_llist_head(&entry->wait_list);
+
+		pr_debug("%s: Adding entry %p for chunk %llu\n",
+			 __func__, entry, i);
+
+		err = xa_insert(&map->rmr_id_map, rmr_id_to_key(id), entry, GFP_KERNEL);
+		if (err) {
+			pr_err("%s: Cannot insert entry for member_id %d, chunk %llu\n",
+			       __func__, map->member_id, i);
+			return err;
+		}
+	}
+
+	return 0;
+}
+
+/**
+ * rmr_map_slps_to_buf - Copy SLPs to given buf
+ *
+ * @map:	Map to work on
+ * @slp_idx:	SLP number to start copying from
+ * @no_of_slp:	Number of SLPs to copy
+ * @buf:	Buffer to copy SLPs to
+ *
+ * Context:
+ *     srcu pool->map_srcu should be held while calling this function.
+ */
+void rmr_map_slps_to_buf(struct rmr_dirty_id_map *map, u64 slp_idx, u64 no_of_slp, u8 *buf)
+{
+	el_flp *flp_ptr;
+	u64 slp_no, flp_no, i = 0;
+	void *slp;
+
+	flp_no = slp_idx >> NO_OF_SLP_PER_FLP_LOG2;
+	slp_no = slp_idx & (NO_OF_SLP_PER_FLP - 1);
+
+	flp_ptr = (el_flp *)map->dirty_bitmap[flp_no];
+	while (i < no_of_slp) {
+		slp = (void *)(*(flp_ptr + slp_no));
+
+		memcpy(buf, slp, PAGE_SIZE);
+		buf += PAGE_SIZE;
+
+		slp_no++;
+		if (slp_no >= NO_OF_SLP_PER_FLP) {
+			flp_no += 1;
+			slp_no = 0;
+
+			flp_ptr = (el_flp *)map->dirty_bitmap[flp_no];
+		}
+
+		i++;
+	}
+
+	return;
+}
+
+/**
+ * rmr_map_buf_to_slps - Copy data from buf to SLPs
+ *
+ * @map:	Map to work on
+ * @buf:	Buffer from which to copy data
+ * @buf_size:	Buffer size
+ * @slp_idx:	SLP number to start copying to
+ * @test:	Whether to compare data or copy
+ *
+ * Return:
+ *	Number of SLPs to which data was copied.
+ *	0 in case of failure.
+ *
+ * Context:
+ *     srcu pool->map_srcu should be held while calling this function.
+ */
+u64 rmr_map_buf_to_slps(struct rmr_dirty_id_map *map, u8 *buf, u32 buf_size, u64 slp_idx,
+			bool test)
+{
+	el_flp *flp_ptr;
+	u64 slp_no, flp_no, i = 0;
+	u64 no_of_slp;
+	void *slp;
+
+	/*
+	 * The buf_size should be a factor of PAGE_SIZE
+	 */
+	if (buf_size % PAGE_SIZE) {
+		pr_info("%s: Failed %u\n", __func__, buf_size);
+		return 0;
+	}
+
+	no_of_slp = buf_size >> PAGE_SHIFT;
+
+	flp_no = slp_idx >> NO_OF_SLP_PER_FLP_LOG2;
+	slp_no = slp_idx & (NO_OF_SLP_PER_FLP - 1);
+
+	pr_info("%s: no_of_slp=%llu, flp_no=%llu, slp_no=%llu, slp_idx=%llu\n",
+		__func__, no_of_slp, flp_no, slp_no, slp_idx);
+	flp_ptr = (el_flp *)map->dirty_bitmap[flp_no];
+	while (i < no_of_slp) {
+		slp = (void *)(*(flp_ptr + slp_no));
+
+		if (test && memcmp(slp, buf, PAGE_SIZE)) {
+			pr_info("%s: Compare failed\n", __func__);
+			return 0;
+		} else if (!test) {
+			memcpy(slp, buf, PAGE_SIZE);
+		}
+		buf += PAGE_SIZE;
+
+		slp_no++;
+		if (slp_no >= NO_OF_SLP_PER_FLP) {
+			flp_no += 1;
+			slp_no = 0;
+
+			flp_ptr = (el_flp *)map->dirty_bitmap[flp_no];
+		}
+
+		i++;
+	}
+
+	return no_of_slp;
+}
+
+void rmr_map_hexdump_bitmap_buf(u8 member_id, void *buf, u32 buf_size)
+{
+	u8 *buf_byte;
+	u32 size = 0;
+
+	buf_byte = buf;
+
+	pr_info("%s: Starting bitmap dump for member %u in hex, size %u\n",
+		__func__, member_id, buf_size);
+	pr_info("---------------------------------------------------------\n");
+	while (size < buf_size) {
+		pr_cont("%02X", *(buf_byte + size));
+		size++;
+	}
+
+	pr_info("\n");
+}
+
+void rmr_map_dump_bitmap(struct rmr_dirty_id_map *map)
+{
+	el_flp *flp_ptr;
+	u64 no_of_slps, no_of_chunks;
+	bool is_last_flp;
+	u8 *slp;
+	int i, j;
+
+	for (i = 0; i < map->no_of_flp; i++) {
+		flp_ptr = (el_flp *)map->dirty_bitmap[i];
+		is_last_flp = (i == (map->no_of_flp - 1));
+
+		if (is_last_flp)
+			no_of_slps = map->no_of_slp_in_last_flp;
+		else
+			no_of_slps = NO_OF_SLP_PER_FLP;
+
+		for (j = 0; j < no_of_slps; j++, flp_ptr++) {
+			slp = (u8 *)(*flp_ptr);
+
+			if (is_last_flp && j == (no_of_slps - 1))
+				no_of_chunks = map->no_of_chunk_in_last_slp;
+			else
+				no_of_chunks = NO_OF_CHUNKS_PER_PAGE;
+
+			/* Each chunk is represented by a byte */
+			rmr_map_hexdump_bitmap_buf(map->member_id, slp, no_of_chunks);
+		}
+	}
+}
+
+/**
+ * rmr_map_summary_format - Format a per-member dirty-chunk summary into buf
+ *
+ * @pool:	Pool whose maps to summarise
+ * @buf:	Output buffer (must be at least @buf_size bytes)
+ * @buf_size:	Size of @buf in bytes
+ *
+ * Description:
+ *	Output format (one line per member that has a map):
+ *	member <id>: [<idx0> <idx1> ...] <dirty_count>/<total> dirty
+ *	At most 50 dirty chunk indices are listed per member; if there
+ *	are more, a "..." marker appears before the closing bracket.
+ *
+ * Context: caller must hold srcu pool->map_srcu.
+ *
+ * Return: number of bytes written (excluding trailing NUL).
+ */
+int rmr_map_summary_format(struct rmr_pool *pool, char *buf, size_t buf_size)
+{
+	struct rmr_dirty_id_map *map;
+	el_flp *flp_ptr;
+	u64 no_of_slps, no_of_chunks_in_slp;
+	u64 chunk_idx, dirty_count;
+	bool is_last_flp;
+	u8 *slp;
+	int printed_ids;
+	int pos = 0;
+	int i, fi, si;
+
+	for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+		map = rcu_dereference(pool->maps[i]);
+		if (!map)
+			continue;
+
+		pos += scnprintf(buf + pos, buf_size - pos,
+				 "member %u: [", map->member_id);
+
+		dirty_count = 0;
+		chunk_idx = 0;
+		printed_ids = 0;
+		for (fi = 0; fi < map->no_of_flp; fi++) {
+			flp_ptr = (el_flp *)map->dirty_bitmap[fi];
+			is_last_flp = (fi == (map->no_of_flp - 1));
+			no_of_slps = is_last_flp ?
+				map->no_of_slp_in_last_flp : NO_OF_SLP_PER_FLP;
+
+			for (si = 0; si < no_of_slps; si++, flp_ptr++) {
+				u64 ci;
+
+				slp = (u8 *)(*flp_ptr);
+				no_of_chunks_in_slp =
+					(is_last_flp && si == (no_of_slps - 1)) ?
+					map->no_of_chunk_in_last_slp :
+					NO_OF_CHUNKS_PER_PAGE;
+
+				for (ci = 0; ci < no_of_chunks_in_slp;
+				     ci++, chunk_idx++) {
+					if (!(slp[ci] & (1 << CHUNK_DIRTY_BIT)))
+						continue;
+					dirty_count++;
+					/* Cap listed IDs to fit all members in PAGE_SIZE */
+					if (printed_ids < 50) {
+						pos += scnprintf(buf + pos,
+								 buf_size - pos,
+								 "%llu ", chunk_idx);
+						printed_ids++;
+					}
+				}
+			}
+		}
+
+		/* Overwrite trailing space before ']' */
+		if (pos > 0 && buf[pos - 1] == ' ')
+			pos--;
+		if (printed_ids < dirty_count)
+			pos += scnprintf(buf + pos, buf_size - pos,
+					 "...] %llu/%llu dirty\n",
+					 dirty_count, map->no_of_chunks);
+		else
+			pos += scnprintf(buf + pos, buf_size - pos,
+					 "] %llu/%llu dirty\n",
+					 dirty_count, map->no_of_chunks);
+	}
+
+	return pos;
+}
+
+void rmr_map_bidump_bitmap_buf(void *buf, u8 member_id, u32 buf_long)
+{
+	char box[65];
+	u64 *buf_byte;
+	u64 the_byte;
+	int i, j;
+	u32 count = 0;
+
+	buf_byte = buf;
+
+	pr_info("%s: bitmap for member %d dump in binary, the size in longs %u\n",
+		__func__, member_id, buf_long);
+	while (count < buf_long) {
+		the_byte = *(buf_byte + count);
+		for (i = 63, j = 0; i >= 0; i--, j++)
+			box[j] = (the_byte & (1ULL << i)) ? '1' : '0';
+		box[j] = '\0';
+		pr_cont("[%s]", box);
+		count++;
+	}
+
+	pr_info("\n");
+	pr_info("---------------------------------------------------------\n");
+}
diff --git a/drivers/infiniband/ulp/rmr/rmr-pool.c b/drivers/infiniband/ulp/rmr/rmr-pool.c
new file mode 100644
index 000000000000..5e5632d9d701
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-pool.c
@@ -0,0 +1,401 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#include <linux/wait.h>
+#include <linux/sched.h>
+#include <linux/slab.h>
+
+#include "rmr-pool.h"
+
+LIST_HEAD(pool_list);
+DEFINE_MUTEX(pool_mutex);	/* mutex to protect pool_list */
+struct kmem_cache *rmr_map_entry_cachep;
+
+const char *rmr_get_cmd_name(enum rmr_msg_cmd_type cmd)
+{
+	switch (cmd) {
+	case RMR_CMD_MAP_READY: return "RMR_CMD_MAP_READY";
+	case RMR_CMD_MAP_SEND: return "RMR_CMD_MAP_SEND";
+	case RMR_CMD_SEND_MAP_BUF: return "RMR_CMD_SEND_MAP_BUF";
+	case RMR_CMD_MAP_BUF_DONE: return "RMR_CMD_MAP_BUF_DONE";
+	case RMR_CMD_MAP_DONE: return "RMR_CMD_MAP_DONE";
+	case RMR_CMD_MAP_DISABLE: return "RMR_CMD_MAP_DISABLE";
+	case RMR_CMD_READ_MAP_BUF: return "RMR_CMD_READ_MAP_BUF";
+	case RMR_CMD_MAP_CHECK: return "RMR_CMD_MAP_CHECK";
+	case RMR_CMD_LAST_IO_TO_MAP: return "RMR_CMD_LAST_IO_TO_MAP";
+	case RMR_CMD_STORE_CHECK: return "RMR_CMD_STORE_CHECK";
+	case RMR_CMD_MAP_TEST: return "RMR_CMD_MAP_TEST";
+	case RMR_CMD_SEND_MD_BUF: return "RMR_CMD_SEND_MD_BUF";
+	case RMR_CMD_MD_SEND: return "RMR_CMD_MD_SEND";
+
+	case RMR_CMD_MAP_GET_VER: return "RMR_CMD_MAP_GET_VER";
+	case RMR_CMD_MAP_SET_VER: return "RMR_CMD_MAP_SET_VER";
+	case RMR_CMD_DISCARD_CLEAR_FLAG: return "RMR_CMD_DISCARD_CLEAR_FLAG";
+	case RMR_CMD_SEND_DISCARD: return "RMR_CMD_SEND_DISCARD";
+
+	case RMR_MAP_CMD_MAX: return "RMR_MAP_CMD_MAX";
+
+	case RMR_CMD_POOL_INFO: return "RMR_CMD_POOL_INFO";
+	case RMR_CMD_JOIN_POOL: return "RMR_CMD_JOIN_POOL";
+
+	case RMR_CMD_REJOIN_POOL: return "RMR_CMD_REJOIN_POOL";
+
+	case RMR_CMD_LEAVE_POOL: return "RMR_CMD_LEAVE_POOL";
+	case RMR_CMD_ENABLE_POOL: return "RMR_CMD_ENABLE_POOL";
+
+	case RMR_CMD_USER: return "RMR_CMD_USER";
+
+	case RMR_POOL_CMD_MAX: return "RMR_POOL_CMD_MAX";
+
+	default: return "Unknown command";
+	}
+}
+
+void free_pool(struct rmr_pool *pool)
+{
+	WARN_ON(!list_empty(&pool->sess_list));
+
+	cleanup_srcu_struct(&pool->sess_list_srcu);
+	cleanup_srcu_struct(&pool->map_srcu);
+
+	if (!list_empty(&pool->entry)) {
+		mutex_lock(&pool_mutex);
+		list_del(&pool->entry);
+		mutex_unlock(&pool_mutex);
+	}
+
+	percpu_ref_exit(&pool->ids_inflight_ref);
+	kfree(pool);
+}
+
+/**
+ * rmr_find_pool_by_group_id - Find a pool with group_id in global pool list
+ *
+ * @group_id: Group_id of the pool being searched
+ *
+ * Locks:
+ *    Caller should hold global pool_mutex
+ */
+struct rmr_pool *rmr_find_pool_by_group_id(u32 group_id)
+{
+	struct rmr_pool *pool;
+
+	list_for_each_entry(pool, &pool_list, entry)
+		if (pool->group_id == group_id)
+			return pool;
+
+	return NULL;
+}
+
+/**
+ * rmr_find_pool - Find a pool named poolname in the global pool list
+ *
+ * @poolname: Name of the pool to be searched
+ *
+ * Locks:
+ *    Caller must hold global pool_mutex
+ */
+struct rmr_pool *rmr_find_pool(const char *poolname)
+{
+	struct rmr_pool *pool;
+
+	lockdep_assert_held(&pool_mutex);
+
+	list_for_each_entry(pool, &pool_list, entry) {
+		if (!strcmp(poolname, pool->poolname))
+			return pool;
+	}
+
+	return NULL;
+}
+
+static void rmr_pool_inflight_ref_release(struct percpu_ref *ref)
+{
+	struct rmr_pool *pool = container_of(ref, struct rmr_pool, ids_inflight_ref);
+
+	complete_all(&pool->complete_done);
+}
+
+void rmr_pool_confirm_inflight_ref(struct percpu_ref *ref)
+{
+	struct rmr_pool *pool = container_of(ref, struct rmr_pool, ids_inflight_ref);
+
+	complete_all(&pool->confirm_done);
+}
+
+static struct rmr_pool *alloc_pool(const char *poolname, u32 group_id)
+{
+	struct rmr_pool *pool;
+	int ret;
+
+	pr_debug("%s: allocate pool %s with group_id %u\n",
+		 __func__, poolname, group_id);
+
+	if (strlen(poolname) > NAME_MAX) {
+		pr_err("%s: Failed to create '%s': name too long\n", __func__, poolname);
+		return ERR_PTR(-EINVAL);
+	}
+
+	pool = kzalloc(sizeof(struct rmr_pool), GFP_KERNEL);
+	if (unlikely(!pool))
+		return ERR_PTR(-ENOMEM);
+
+	ret = init_srcu_struct(&pool->sess_list_srcu);
+	if (ret) {
+		pr_err("%s: Sess list srcu init failed, err: %d\n", __func__, ret);
+		pool = ERR_PTR(ret);
+		goto free_pool;
+	}
+
+	ret = init_srcu_struct(&pool->map_srcu);
+	if (ret) {
+		pr_err("%s: Map srcu init failed, err: %d\n", __func__, ret);
+		pool = ERR_PTR(ret);
+		goto cleanup_sess_srcu;
+	}
+
+	ret = percpu_ref_init(&pool->ids_inflight_ref,
+			      rmr_pool_inflight_ref_release,
+			      PERCPU_REF_ALLOW_REINIT, GFP_KERNEL);
+	if (ret) {
+		pr_err("%s: Percpu reference init failed for pool %s\n", __func__, poolname);
+		pool = ERR_PTR(ret);
+		goto cleanup_map_srcu;
+	}
+
+	pool->group_id = group_id;
+	pool->map_ver = 1;
+	pool->mapped_size = 0;
+	xa_init_flags(&pool->stg_members, XA_FLAGS_ALLOC);
+	init_completion(&pool->complete_done);
+	init_completion(&pool->confirm_done);
+	mutex_init(&pool->sess_lock);
+	mutex_init(&pool->maps_lock);
+	INIT_LIST_HEAD(&pool->entry);
+	INIT_LIST_HEAD(&pool->sess_list);
+
+	init_completion(&pool->discard_done);
+	atomic_set(&pool->discard_waiting, 0);
+	atomic_set(&pool->normal_count, 0);
+
+	strscpy(pool->poolname, poolname, sizeof(pool->poolname));
+
+	return pool;
+
+cleanup_map_srcu:
+	cleanup_srcu_struct(&pool->map_srcu);
+cleanup_sess_srcu:
+	cleanup_srcu_struct(&pool->sess_list_srcu);
+free_pool:
+	kfree(pool);
+	return pool;
+}
+
+struct rmr_pool *rmr_create_pool(const char *poolname, void *priv)
+{
+	u32 group_id;
+	struct rmr_pool *pool;
+
+	mutex_lock(&pool_mutex);
+
+	pool = rmr_find_pool(poolname);
+	if (unlikely(pool)) {
+		pr_err("Pool '%s' already exists\n", poolname);
+		pool = ERR_PTR(-EEXIST);
+		goto out;
+	}
+
+	/* Calculate the poolname hash */
+	group_id = rmr_pool_hash(poolname);
+
+	/* Double ensure there is no hash-clash */
+	pool = rmr_find_pool_by_group_id(group_id);
+	if (unlikely(pool)) {
+		pr_err("Pool '%s' already exists\n", poolname);
+		pool = ERR_PTR(-EEXIST);
+		goto out;
+	}
+
+	pool = alloc_pool(poolname, group_id);
+	if (IS_ERR(pool)) {
+		pr_err("Pool allocation failed for pool %s\n", poolname);
+		goto out;
+	}
+
+	list_add(&pool->entry, &pool_list);
+	pool->priv = priv;
+	pool->pool_md.magic = RMR_POOL_MD_MAGIC;
+
+out:
+	mutex_unlock(&pool_mutex);
+	return pool;
+}
+
+/**
+ * rmr_pool_maps_to_buf - Copy dirty_bitmap buffer of pool to buf
+ *
+ * @pool:	The pool whose map is to be copied
+ * @map_idx:	The map index in the pool's map array
+ * @offset:	The offset to read from in the maps dirty_bitmap buffer
+ * @buf:	Pointer to buf where to copy the dirty_bitmap buffer
+ * @buflen:	Length of the buf available to copy to
+ * @filter:	TODO
+ *
+ * Description:
+ *	This function is one half of the (map <-> buf) pair. It is used to save map into a buf.
+ *	The other half is rmr_pool_save_map, which is used to save a buf into the map.
+ *	This function is used while both sending a map and reading a map.
+ *	The process for both of them is largely same.
+ *
+ *	The relevant params like member_id, offset for the dirty_bitmap buffer
+ *	are stored in the rmr_map_buf_hdr, which is kept at the starting of buf.
+ *
+ *	The caller has to take care of sending the correct map index and offset to copy from.
+ *	For this, the function provides some help in the form of updating the map_idx and
+ *	offset values (for map send), and storing it those in map_buf_hdr (for map read).
+ *
+ * Return value:
+ *	0 If there is no more data to send
+ *	Total size copied to buf
+ */
+int rmr_pool_maps_to_buf(struct rmr_pool *pool, u8 *map_idx, u64 *slp_idx,
+			 void *buf, size_t buflen, rmr_map_filter filter)
+{
+	struct rmr_map_buf_hdr *map_buf_hdr = (struct rmr_map_buf_hdr *)buf;
+	struct rmr_dirty_id_map *map = NULL;
+	int lock_idx;
+	u64 no_of_slp;
+
+	/* Adjust buf and buflen */
+	buf += sizeof(struct rmr_map_buf_hdr);
+	buflen -= sizeof(struct rmr_map_buf_hdr);
+
+	lock_idx = srcu_read_lock(&pool->map_srcu);
+	for ( ; ; *map_idx += 1) {
+
+		if (*map_idx >= pool->maps_cnt) {
+			srcu_read_unlock(&pool->map_srcu, lock_idx);
+			return 0;
+		}
+
+		map = rcu_dereference(pool->maps[*map_idx]);
+		if (map)
+			break;
+	}
+
+	map_buf_hdr->version = RMR_MAP_FORMAT_VER;
+
+	/* This is for the destination, to inform where to store */
+	map_buf_hdr->member_id = map->member_id;
+	map_buf_hdr->dst_slp_idx = (*slp_idx);
+
+	/*
+	 * SLPs are pages. Duh!
+	 */
+	no_of_slp = buflen >> PAGE_SHIFT;
+	no_of_slp = min(no_of_slp, (map->total_slp - *slp_idx));
+	rmr_map_slps_to_buf(map, *slp_idx, no_of_slp, buf);
+	map_buf_hdr->buf_size = no_of_slp * PAGE_SIZE;
+
+	if ((*slp_idx + no_of_slp) >= map->total_slp) {
+		/*
+		 * All done for this map.
+		 * Now move on to the next one, and reset the index.
+		 */
+		*map_idx += 1;
+		*slp_idx = 0;
+	} else {
+		/*
+		 * Copy the number of SLPs we can, and increment the index.
+		 */
+		*slp_idx += no_of_slp;
+	}
+
+	pr_info("%s: buf_size %u, buflen w/o hdr %lu\n",
+		__func__, map_buf_hdr->buf_size, buflen);
+
+	/* This is for MAP_READ, to inform where to ask from next */
+	map_buf_hdr->map_idx = *map_idx;
+	map_buf_hdr->slp_idx = *slp_idx;
+
+	srcu_read_unlock(&pool->map_srcu, lock_idx);
+
+	return (map_buf_hdr->buf_size + sizeof(struct rmr_map_buf_hdr));
+}
+
+/**
+ * rmr_pool_save_map - Copy given buf to dirty_bitmap buffer of pool
+ *
+ * @pool:	The pool whose map is the dest for the copy
+ * @buf:	Pointer to buf from where to copy
+ * @buflen:	Length of the buf available to copy
+ * @test_only:	Only test if the buf given matches with dirty_bitmap buf of pool
+ * @map_clean:	TODO
+ *
+ * Description:
+ *	This function is the other half of the (map <-> buf) pair.
+ *	It saves buf into the map of pool. The relevant params are read from the
+ *	rmr_map_buf_hdr which lies in the start of the given buf.
+ *
+ * Return value:
+ *	0 on success
+ *	-errno on error
+ */
+int rmr_pool_save_map(struct rmr_pool *pool, void *buf, size_t buflen,
+		      bool test_only)
+{
+	struct rmr_map_buf_hdr *map_buf_hdr = (struct rmr_map_buf_hdr *)buf;
+	struct rmr_dirty_id_map *map = NULL;
+	int err = 0, lock_idx;
+	u32 buf_size;
+	u64 slp_idx;
+
+	if (map_buf_hdr->version != RMR_MAP_FORMAT_VER) {
+		pr_err("Wrong map format. Expected %d but received %llu\n",
+		       RMR_MAP_FORMAT_VER, map_buf_hdr->version);
+		return -EINVAL;
+	}
+
+	/* Adjust buf and buflen */
+	buf += sizeof(struct rmr_map_buf_hdr);
+	buflen -= sizeof(struct rmr_map_buf_hdr);
+
+	lock_idx = srcu_read_lock(&pool->map_srcu);
+	map = rmr_pool_find_map(pool, map_buf_hdr->member_id);
+	if (!map) {
+		pr_err("%s: No map found for member_id %llu\n",
+		       __func__, map_buf_hdr->member_id);
+		err = -ENOENT;
+		goto out;
+	}
+
+	slp_idx = map_buf_hdr->dst_slp_idx;
+	buf_size = map_buf_hdr->buf_size;
+
+	pr_info("%s: For pool %s, received map for %llu, slp_idx %llu, buf_size %u, buflen %lu\n",
+		__func__, pool->poolname, map_buf_hdr->member_id, slp_idx, buf_size, buflen);
+
+	/* Sanity */
+	WARN_ON(buf_size > buflen);
+	WARN_ON(buf_size % PAGE_SIZE);
+
+	pr_info("%s: buf_size %u, buflen w/o hdr %lu\n", __func__, map_buf_hdr->buf_size, buflen);
+
+	/*
+	 * The buf_size would be a factor of PAGE_SIZE,
+	 * and thats how we know no_of_slp(s) to save.
+	 */
+	if (!rmr_map_buf_to_slps(map, buf, buf_size, slp_idx, test_only)) {
+		pr_err("%s: rmr_map_buf_to_slps failed\n", __func__);
+		goto out;
+	}
+
+out:
+	srcu_read_unlock(&pool->map_srcu, lock_idx);
+
+	return err;
+}
diff --git a/drivers/infiniband/ulp/rmr/rmr-req.c b/drivers/infiniband/ulp/rmr/rmr-req.c
new file mode 100644
index 000000000000..d748579c489c
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-req.c
@@ -0,0 +1,796 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#include <linux/slab.h>
+
+#include "rmr-req.h"
+#include "rmr-srv.h"
+#include "rmr-clt.h"
+
+extern struct kmem_cache *rmr_req_cachep;
+extern struct kmem_cache *rmr_map_entry_cachep;
+extern struct rmr_store_ops *pstore_ops;
+
+static void rmr_req_complete(struct rmr_srv_req *req);
+static void rmr_req_store_done(struct rmr_srv_req *req);
+static void rmr_req_sync_failed(struct rmr_srv_req *req);
+static void rmr_req_send_map_clear(struct rmr_srv_req *req);
+static void rmr_req_sync_complete(struct rmr_srv_req *req);
+static void rmr_req_store(struct rmr_srv_req *req);
+
+/**
+ * rmr_srv_req_resp - Response from the lower level module
+ *
+ * @req:	Request to be processed
+ * @err:	Error value
+ *
+ * Description:
+ *	This function is the return point from the below module
+ *	where IO is submitted.
+ *
+ * Context:
+ *	In this function the request should always be in state RMR_REQ_STATE_STORE
+ */
+void rmr_srv_req_resp(struct rmr_srv_req *req, int err)
+{
+	/*
+	 * Use the error sent from lower layer
+	 */
+	req->err = err;
+
+	/*
+	 * For Normal (non-sync) requests we handle both non-error and error cases from one
+	 * place. Since its simple.
+	 */
+	if (rmr_op(req->flags) != RMR_OP_SYNCREQ) {
+		rmr_req_complete(req);
+		return;
+	}
+
+	/*
+	 * Sync requests are complicated, since it needs extra post-processing
+	 * once IO is done for us.
+	 *
+	 * 1) In case of no failure, we need to send map clear to other nodes,
+	 *    since they think we are still dirty for this chunk.
+	 *
+	 * 2) We need to check for waiting IO in entry->wait_list, and kick them.
+	 */
+	if (!req->err)
+		rmr_req_store_done(req);
+	else
+		rmr_req_sync_failed(req);
+}
+EXPORT_SYMBOL(rmr_srv_req_resp);
+
+/**
+ * rmr_srv_req_create - Create an rmr server request
+ *
+ * @msg:	IO message containing information
+ * @srv_pool:	Server pool creating this request
+ * @rtrs_op:	rtrs IO context
+ * @data:	pointer to data buf
+ * @datalen:	len of data buf
+ * @endreq:	Function to be called at the end of rmr request processing
+ *
+ * Description:
+ *	RMR server request are base structures which holds the IO while they are being processed.
+ *	They go through a state machine, while a number of checks are done. IOs which are
+ *	destined for a chunk that is dirty, are paused while that chunk is synced.
+ *
+ * Return:
+ *	Pointer to the create rmr server request on success
+ *	Error pointer on failure
+ */
+struct rmr_srv_req *rmr_srv_req_create(const struct rmr_msg_io *msg, struct rmr_srv_pool *srv_pool,
+				       struct rtrs_srv_op *rtrs_op, void *data, u32 datalen,
+				       void (*endreq)(struct rmr_srv_req *, int))
+{
+	struct rmr_srv_req *req;
+	struct rmr_srv_io_store *store = srv_pool->io_store;
+	int i;
+
+	if (!store || !atomic_read(&srv_pool->store_state)) {
+		pr_err("%s: store not set, or srv_pool not in correct state %s\n",
+		       __func__, srv_pool->pool->poolname);
+		return ERR_PTR(-ENODEV);
+	}
+
+	req = kmem_cache_zalloc(rmr_req_cachep, GFP_KERNEL);
+	if (!req) {
+		pr_err("cannot allocate memory for rmr_req.\n");
+		return ERR_PTR(-ENOMEM);
+	}
+
+	req->id.a = le64_to_cpu(msg->id_a);
+	req->id.b = le64_to_cpu(msg->id_b);
+
+	req->offset = le32_to_cpu(msg->offset);
+	req->length = le32_to_cpu(msg->length);
+	req->flags = le32_to_cpu(msg->flags);
+	req->prio = le16_to_cpu(msg->prio);
+
+	req->mem_id = le32_to_cpu(msg->mem_id);
+	for (i = 0; i < msg->failed_cnt; i++)
+		req->failed_srv_id[i] = msg->failed_id[i];
+
+	req->failed_cnt = msg->failed_cnt;
+	req->map_ver = le64_to_cpu(msg->map_ver);
+	req->sync = msg->sync;
+
+	req->data = data;
+	req->datalen = datalen;
+	req->rtrs_op = rtrs_op;
+	req->srv_pool = srv_pool;
+	req->store = store;
+	req->endreq = endreq;
+
+	pr_debug("req %p, chunk_size %u\n", req, req->srv_pool->pool->chunk_size);
+
+	return req;
+}
+
+struct rmr_srv_req *rmr_srv_md_req_create(struct rmr_srv_pool *srv_pool,
+					  struct rtrs_srv_op *rtrs_op, void *data,
+					  u32 offset, u32 len, unsigned long flags,
+					  void (*endreq)(struct rmr_srv_req *, int))
+{
+	struct rmr_srv_req *req;
+	struct rmr_srv_io_store *store = srv_pool->io_store;
+
+	if (!store) {
+		pr_err("No store_id registered for srv pool %s\n", srv_pool->pool->poolname);
+		return ERR_PTR(-ENODEV);
+	}
+
+	req = kmem_cache_zalloc(rmr_req_cachep, GFP_KERNEL);
+	if (!req) {
+		pr_err("cannot allocate memory for rmr_req.\n");
+		return ERR_PTR(-ENOMEM);
+	}
+	req->offset = offset;
+	req->length = len;
+	req->flags = flags;
+	req->sync = false; /* A md req is always non-sync */
+
+	req->data = data;
+	req->rtrs_op = rtrs_op;
+	req->srv_pool = srv_pool;
+	req->store = store;
+	req->endreq = endreq;
+
+	pr_debug("md req %p, len %u\n", req, len);
+
+	return req;
+}
+
+void rmr_req_submit(struct rmr_srv_req *req);
+static void rmr_req_sched(struct work_struct *work)
+{
+	struct rmr_srv_req *req = container_of(work, struct rmr_srv_req, work);
+
+	pr_debug("scheduled work process for req %p\n", req);
+	if (req->err)
+		rmr_req_complete(req);
+	else
+		rmr_req_submit(req);
+}
+
+void rmr_process_wait_list(struct rmr_map_entry *entry, int err)
+{
+	struct llist_node *first, *next;
+	struct rmr_srv_req *req;
+
+	pr_debug("processing wait list for entry %p, sync_cnt=%d\n",
+		 entry, atomic_read(&entry->sync_cnt));
+
+	WARN_ON(atomic_read(&entry->sync_cnt) > 0);
+
+	while (!llist_empty(&entry->wait_list)) {
+		first = llist_del_all(&entry->wait_list);
+		while (first) {
+			next = first->next;
+			req = llist_entry(first, struct rmr_srv_req, node);
+
+			pr_debug("process waiting req %p id (%llu, %llu) flags %u\n",
+				 req, req->id.a, req->id.b, req->flags);
+			if (err) {
+				pr_err("fail waiting req %p id (%llu, %llu) flags %u err %d\n",
+				       req, req->id.a, req->id.b, req->flags, err);
+				req->err = -EIO;
+			}
+
+			pr_debug("schedule processing req %p with err %d\n", req, req->err);
+			INIT_WORK(&req->work, rmr_req_sched);
+			schedule_work(&req->work);
+
+			first = next;
+		}
+	}
+}
+
+void rmr_req_submit(struct rmr_srv_req *req)
+{
+	struct rmr_srv_pool *srv_pool = req->srv_pool;
+	struct rmr_map_entry *entry;
+	struct rmr_dirty_id_map *map;
+
+	if (rmr_op(req->flags) == RMR_OP_FLUSH && !req->length) {
+		rmr_req_store(req);
+		return;
+	}
+
+	pr_debug("check map for req %p flag %u request id [%llu, %llu] offset %u length %u\n",
+		 req, req->flags,
+		 req->id.a, req->id.b, req->offset, req->length);
+
+	map = rmr_pool_find_map(srv_pool->pool, srv_pool->member_id);
+	if (!map) {
+		pr_err("no map found for pool_id %u\n", srv_pool->member_id);
+		req->err = -EINVAL;
+		goto err;
+	}
+
+	rcu_read_lock();
+	entry = rmr_map_get_dirty_entry(map, req->id);
+	if (!entry) {
+		/*
+		 * The chunk containing data for this req is NOT dirty for us
+		 */
+		pr_debug("check map for req %p flags %u request id [%llu, %llu], no entry in the map\n",
+			 req, req->flags, req->id.a, req->id.b);
+		rcu_read_unlock();
+		rmr_req_store(req);
+		return;
+	} else {
+		/*
+		 * The chunk for this data is dirty for us.
+		 *
+		 * we have 2 cases.
+		 *
+		 * 1) Its coming from a sync rmr-clt (Its an internal read).
+		 * Then, fail the IO, since we do not want to end up in a deadlock,
+		 * or go through multiple hops for a single read. The sender can try some other
+		 * node itself.
+		 */
+		if (req->sync) {
+			WARN_ON(rmr_op(req->flags) != RMR_OP_READ);
+			rcu_read_unlock();
+			req->err = -EIO;
+			goto err;
+		}
+
+		/*
+		 * 2) If its coming from a non-sync rmr-clt,
+		 *    simply go ahead with syncing the data first.
+		 */
+		llist_add(&req->node, &entry->wait_list);
+		pr_debug("%s: req %p flags %u id (%llu %llu) added to wait list. sync_cnt %d\n",
+			 __func__, req, req->flags, req->id.a, req->id.b,
+			 atomic_read(&entry->sync_cnt));
+
+		rcu_read_unlock();
+		/*
+		 * If we are the first who grabs the entry then start sync.
+		 *
+		 * Otherwise, the one syncing the data would pick us up from the entry->wait_list
+		 * and kick us. So simply exit for now.
+		 */
+		if (atomic_cmpxchg(&entry->sync_cnt, -1, 0) == -1) {
+			int err;
+
+			req->priv = entry;
+			err = rmr_srv_sync_chunk_id(srv_pool, entry, req->id, false);
+			if (err) {
+				atomic_set(&entry->sync_cnt, -1);
+				rmr_process_wait_list(entry, err);
+			}
+		}
+	}
+
+	return;
+
+err:
+	rmr_req_complete(req);
+}
+
+static void rmr_req_store(struct rmr_srv_req *req)
+{
+	int err;
+
+	pr_debug("submit to store req %p flags %u request id [%llu, %llu] offset %u length %u\n",
+		 req, req->flags,
+		 req->id.a, req->id.b, req->offset, req->length);
+
+	err = req->store->ops->submit_req(req->store->priv, req->data, req->offset,
+					  req->length, req->flags, req->prio, req);
+	if (err) {
+		pr_err("%s: error submitting req %p, err %d\n", __func__, req, err);
+		req->err = err;
+		if (rmr_op(req->flags) == RMR_OP_SYNCREQ)
+			rmr_req_sync_failed(req);
+		else
+			rmr_req_complete(req);
+	}
+}
+
+static void rmr_md_req_store(struct rmr_srv_req *req)
+{
+	int err;
+
+	err = req->store->ops->submit_md_req(req->store->priv, req->data, req->offset, req->length,
+					     req->flags, req);
+	if (err) {
+		req->endreq(req, err);
+		pr_err("release md req %p, flags %u\n", req, req->flags);
+		kmem_cache_free(rmr_req_cachep, req);
+	}
+}
+
+/* md req submission path*/
+void rmr_md_req_submit(struct rmr_srv_req *req)
+{
+	rmr_md_req_store(req);
+}
+
+static void rmr_req_sched_store(struct work_struct *work)
+{
+	struct rmr_srv_req *req = container_of(work, struct rmr_srv_req, work);
+
+	pr_debug("scheduled store for req %p\n", req);
+	rmr_req_store(req);
+}
+
+static void rmr_req_remote_io_done(void *priv, int err)
+{
+	struct rmr_srv_req *req = priv;
+
+	pr_debug("called for req %p, err code %d\n", req, err);
+
+	rmr_clt_put_iu(req->srv_pool->clt, req->iu);
+
+	if (err) {
+		req->err = err;
+		rmr_req_sync_failed(req);
+		return;
+	}
+
+	pr_debug("schedule store for req %p with err %d\n", req, req->err);
+	INIT_WORK(&req->work, rmr_req_sched_store);
+	schedule_work(&req->work);
+}
+
+static void rmr_req_remote_read(struct rmr_srv_req *req)
+{
+	struct rmr_srv_pool *srv_pool = req->srv_pool;
+	struct rmr_pool *clt = srv_pool->clt;
+	unsigned long flags;
+	int err;
+
+	pr_debug("redirecting req id (%llu, %llu)\n",
+		 req->id.a, req->id.b);
+	if (!clt) {
+		pr_err("No srv pool assigned for redirect for %s\n", srv_pool->pool->poolname);
+		err = -EINVAL;
+		goto err;
+	}
+
+	if (rmr_op(req->flags) == RMR_OP_SYNCREQ)
+		flags = RMR_OP_READ;
+	else
+		flags = req->flags;
+
+	req->iu = rmr_clt_get_iu(clt, flags, WAIT);
+	if (IS_ERR_OR_NULL(req->iu)) {
+		pr_err("Failed to get rmr_iu for req id (%llu, %llu)\n",
+		       req->id.a, req->id.b);
+		err = -EINVAL;
+		goto err;
+	}
+
+	sg_init_one(&req->sg, req->data, req->datalen);
+
+	pr_debug("After sg_init_one nents=%d\n", sg_nents(&req->sg));
+
+	/* look at the flags here! */
+	err = rmr_clt_request(clt, req->iu, req->offset, req->length, flags,
+			      req->prio, req, rmr_req_remote_io_done,
+			      &req->sg, sg_nents(&req->sg));
+	if (err) {
+		pr_err("rmr_clt_request error %d\n", err);
+		rmr_clt_put_iu(clt, req->iu);
+		err = -EREMOTEIO;
+		goto err;
+	}
+
+	pr_debug("remote read submitted\n");
+	return;
+
+err:
+	req->err = err;
+	rmr_req_sync_failed(req);
+}
+
+static void rmr_sync_req_sched(struct work_struct *work)
+{
+	struct rmr_srv_req *req = container_of(work, struct rmr_srv_req, work);
+
+	pr_debug("scheduled work process for req %p\n", req);
+	if (req->err)
+		rmr_req_sync_complete(req);
+	else
+		rmr_req_send_map_clear(req);
+}
+
+static void rmr_req_complete(struct rmr_srv_req *req)
+{
+	pr_debug("send completeion for req %p flags %u request id (%llu, %llu) offset %u length %u err %d\n",
+		 req, req->flags,
+		 req->id.a, req->id.b, req->offset, req->length, req->err);
+
+	/* endreq() records the Last IO buffer accordingly. */
+	req->endreq(req, req->err);
+
+	pr_debug("release req %p, flags %u\n", req, req->flags);
+
+	kmem_cache_free(rmr_req_cachep, req);
+}
+
+static struct rmr_srv_req *rmr_req_create_sync_req(struct rmr_srv_pool *srv_pool, rmr_id_t id,
+						   u32 offset, u32 len, bool from_sync,
+						   struct rmr_srv_req *parent)
+{
+	struct rmr_srv_req *req;
+	struct rmr_srv_io_store *store = srv_pool->io_store;
+
+	if (!store) {
+		pr_err("No store_id registered for srv pool %s\n", srv_pool->pool->poolname);
+		return ERR_PTR(-ENODEV);
+	}
+
+	req = kmem_cache_zalloc(rmr_req_cachep, GFP_KERNEL);
+	if (!req) {
+		pr_err("cannot allocate memory for rmr_req.\n");
+		return ERR_PTR(-ENOMEM);
+	}
+	req->id.a = id.a;
+	req->id.b = id.b;
+	req->flags = RMR_OP_SYNCREQ;
+	req->length = len;
+	req->offset = offset;
+	req->srv_pool = srv_pool;
+	req->store = store;
+	req->from_sync = from_sync;
+
+	if (parent) {
+		req->data = parent->data + offset;
+	} else {
+		req->data = kmalloc(req->length, GFP_KERNEL);
+		if (!req->data) {
+			pr_err("cannot allocate memory for sync req id [%llu, %llu]\n",
+			       req->id.a, req->id.b);
+			kmem_cache_free(rmr_req_cachep, req);
+			return ERR_PTR(-ENOMEM);
+		}
+	}
+	req->datalen = len;
+	req->parent = parent;
+
+	pr_debug("sync req %p created, flags %u request id (%llu, %llu) offset %u length %u parent %p\n",
+		 req, req->flags, req->id.a, req->id.b, req->offset, req->length, parent);
+
+	return req;
+}
+
+//should be called only if corresponding map entry has 0 sync cnt
+int rmr_srv_sync_chunk_id(struct rmr_srv_pool *srv_pool, struct rmr_map_entry *entry,
+			  rmr_id_t id, bool from_sync)
+{
+	struct rmr_pool *pool = srv_pool->pool;
+	struct rmr_dirty_id_map *map;
+	struct rmr_srv_req *parent_req;
+	u32 max_io_size, total_len, offset;
+
+	if (!srv_pool->clt) {
+		pr_err("For pool %s no sync pool assigned.\n", pool->poolname);
+		return -EINVAL;
+	}
+	max_io_size = srv_pool->max_sync_io_size;
+
+	map = rmr_pool_find_map(pool, srv_pool->member_id);
+	if (!map) {
+		pr_err("no map found for pool_id %u\n", srv_pool->member_id);
+		//TODO: handle this , probably initialize map, or just throw err?
+		return -EINVAL;
+	}
+
+	offset = CHUNK_TO_OFFSET(id.b, pool->chunk_size_shift);
+	total_len = pool->chunk_size;
+
+	pr_debug("pool %s sync id (%llu, %llu), total_len %u, max_io_size %u\n",
+		 pool->poolname, id.a, id.b, total_len, max_io_size);
+
+	/*
+	 * The parent_req starts with total_len, then get decremented in loop below.
+	 * The child reqs are filled one by one from end to second.
+	 *
+	 * Maybe refactor this to a simple loop?
+	 */
+	parent_req = rmr_req_create_sync_req(srv_pool, id, offset, total_len, from_sync, NULL);
+	if (IS_ERR_OR_NULL(parent_req)) {
+		pr_err("pool %s failed to create main sync req to sync id (%llu, %llu)\n",
+		       pool->poolname, id.a, id.b);
+		return -ENOMEM;
+	}
+	parent_req->priv = entry;
+
+	if (from_sync) {
+		if (rmr_srv_get_sync_permit(srv_pool)) {
+			pr_err("rmr_srv_sync_chunk_id failed to acquire permit for parent\n");
+			kfree(parent_req->data);
+			kmem_cache_free(rmr_req_cachep, parent_req);
+
+			return -EINVAL;
+		}
+	}
+
+	// inc ref cnt for parent_req
+	map_entry_get_sync(entry);
+	while (parent_req->length > max_io_size) {
+		struct rmr_srv_req *req;
+		u32 child_offset = offset + (parent_req->length - max_io_size);
+
+		// submit req
+		req = rmr_req_create_sync_req(srv_pool, id, (parent_req->length - max_io_size),
+					      max_io_size, from_sync, parent_req);
+		if (IS_ERR_OR_NULL(req)) {
+			pr_err("%s: Pool %s, id (%llu, %llu), offset %u, len %u, err %ld\n",
+			       __func__, pool->poolname, id.a, id.b,
+			       (parent_req->length - max_io_size), max_io_size, PTR_ERR(req));
+			parent_req->err = PTR_ERR(req);
+
+			rmr_req_sync_failed(parent_req);
+			return -EINVAL;
+		}
+
+		/*
+		 * The offset sent to rmr_req_create_sync_req for this req is in context of the
+		 * chunk. But the real offset for this req in the disk is this.
+		 */
+		req->offset = child_offset;
+
+		if (from_sync) {
+			if (rmr_srv_get_sync_permit(srv_pool)) {
+				pr_err("rmr_srv_sync_chunk_id failed to acquire permit for child\n");
+				kmem_cache_free(rmr_req_cachep, req);
+
+				parent_req->err = -EBUSY;
+				rmr_req_sync_failed(parent_req);
+				return -EINVAL;
+			}
+		}
+
+		// inc ref cnt for the child req just created
+		map_entry_get_sync(entry);
+		req->priv = entry;
+		rmr_req_remote_read(req);
+
+		parent_req->length -= max_io_size;
+		parent_req->datalen -= max_io_size;
+	}
+
+	//submit parent req
+	rmr_req_remote_read(parent_req);
+
+	return 0;
+}
+
+static void __release_parent_req(struct rcu_head *head)
+{
+	struct rmr_srv_req *req = container_of(head, struct rmr_srv_req, rcu);
+	struct rmr_map_entry *entry = req->priv;
+
+	pr_debug("is called for req=%p id=(%llu,%llu) err=%d, entry=%p\n",
+		 req, req->id.a, req->id.b, req->err, entry);
+
+	kfree(req->data);
+
+	//may be now we can stop saving entry in req->priv, but always rmr_map_find it
+	if (!req->err) {
+		pr_debug("req %p, completed all sync req, lets clean map\n", req);
+		rmr_process_wait_list(entry, 0);
+	} else {
+		pr_debug("req %p completed with err %d, process wait list\n",
+			 req, req->err);
+
+		/* sync of this entry failed, we reset the sync_cnt so that the other req
+		 * or sync thread could try again in the future. Without resetting, no one
+		 * could get the ref and start sync again.
+		 */
+		atomic_set(&entry->sync_cnt, -1);
+		rmr_process_wait_list(entry, req->err);
+	}
+
+	pr_debug("free entry %p for req %p\n", entry, req);
+	kmem_cache_free(rmr_map_entry_cachep, entry);
+
+	if (req->from_sync)
+		rmr_srv_put_sync_permit(req->srv_pool);
+
+	kmem_cache_free(rmr_req_cachep, req);
+}
+
+static void rmr_req_sync_complete(struct rmr_srv_req *req)
+{
+	struct rmr_srv_pool *srv_pool = req->srv_pool;
+	struct rmr_dirty_id_map *map;
+	int lock_idx;
+
+	pr_debug("sync_req %p completed for id (%llu, %llu), offset %u, len %u, err %d, from sync %d\n",
+		 req, req->id.a, req->id.b, req->offset, req->length,
+		 req->err, req->from_sync);
+
+	if (req->err)
+		rmr_srv_sync_req_failed(req->srv_pool);
+
+	pr_debug("release sync req %p, flags %u\n", req, req->flags);
+
+	/*
+	 * Only parent sync req own the allocated data.
+	 */
+	if (!req->parent) {
+		if (!req->err) {
+			map = rmr_pool_find_map(srv_pool->pool,
+						srv_pool->member_id);
+			if (map) {
+				lock_idx = srcu_read_lock(&srv_pool->pool->map_srcu);
+				rmr_map_unset_dirty(map, req->id,
+						    MAP_NO_FILTER);
+				srcu_read_unlock(&srv_pool->pool->map_srcu, lock_idx);
+			} else {
+				pr_err("no map found for pool_id %u\n", srv_pool->member_id);
+				req->err = -EINVAL;
+			}
+		}
+
+		pr_debug("req %p, completed all sync req, lets clean map\n",
+			 req);
+		call_rcu(&req->rcu, __release_parent_req);
+	} else {
+		/*
+		 * Child req has nothing to do but put permit and free
+		 */
+		if (req->from_sync)
+			rmr_srv_put_sync_permit(req->srv_pool);
+
+		kmem_cache_free(rmr_req_cachep, req);
+	}
+}
+
+static void rmr_req_sync_failed(struct rmr_srv_req *req)
+{
+	rmr_srv_sync_req_failed(req->srv_pool);
+
+	pr_err("pool %s sync req %p failed for id (%llu, %llu), offset %u, len %u, err %d\n",
+	       req->srv_pool->pool->poolname, req, req->id.a, req->id.b,
+	       req->offset, req->length, req->err);
+
+	rmr_req_store_done(req);
+}
+
+// this is actually very like rmr_req_remote_io_done but without rmr_clt_put_iu
+// do we want to have one function for both cases?
+static void rmr_req_map_clear_done(void *priv, int err)
+{
+	struct rmr_srv_req *req = priv;
+
+	rmr_clt_put_iu(req->srv_pool->clt, req->iu);
+
+	pr_debug("called for req %p, err code %d\n", req, err);
+	if (err)
+		pr_err("pool %s, sync req  with id (%llu, %llu) failed to send map clear\n",
+		       req->srv_pool->pool->poolname, req->id.a, req->id.b);
+
+	rmr_req_sync_complete(req);
+}
+
+static void rmr_req_store_done(struct rmr_srv_req *req)
+{
+	struct rmr_map_entry *entry = req->priv;
+	struct rmr_srv_req *parent_req = NULL;
+
+	pr_debug("called for req %p id (%llu, %llu ) offset %u len %u with parent req %p\n",
+		 req, req->id.a, req->id.b, req->offset, req->length, req->parent);
+
+	if (req->parent)
+		parent_req = req->parent;
+	else
+		parent_req = req;
+
+	if (req->err)
+		parent_req->err = req->err;
+
+	if (map_entry_put_sync(entry)) {
+		pr_debug("%s: for entry %p id (%llu, %llu) all sync req done.\n", __func__,
+			 entry, req->id.a, req->id.b);
+
+		/* We have to schedule the work of parent req from here since we are in the
+		 * interrupt context of either parent req or child req
+		 */
+		pr_debug("%s: process parent_req %p\n", __func__, parent_req);
+		INIT_WORK(&parent_req->work, rmr_sync_req_sched);
+		schedule_work(&parent_req->work);
+	}
+
+	if (req != parent_req) {
+		pr_debug("completing req %p with err %d\n", req, req->err);
+		rmr_req_sync_complete(req);
+	}
+}
+
+static void rmr_req_send_map_clear(struct rmr_srv_req *req)
+{
+	struct rmr_srv_pool *srv_pool = req->srv_pool;
+	struct rmr_pool *pool = srv_pool->clt;
+	struct rmr_iu *iu;
+	int err;
+
+	if (!pool) {
+		pr_err("Cannot send map clear. No pool client assigend for srv pool %s\n",
+		       req->srv_pool->pool->poolname);
+		req->err = -EINVAL;
+		goto err;
+	}
+
+	/*
+	 * We try to clear map, but if we fail to, we simply ignore the error.
+	 * Such zombie entries will be clear by rmr_srv_check_map_clear.
+	 */
+	iu = rmr_clt_get_iu(pool, RMR_OP_WRITE, WAIT);
+	if (IS_ERR_OR_NULL(iu)) {
+		pr_err("Failed to get rmr_iu for req id (%llu, %llu)\n",
+		       req->id.a, req->id.b);
+		goto err;
+	}
+
+	pr_debug("send map clear req id (%llu, %llu), member_id %u\n",
+		 req->id.a, req->id.b, srv_pool->member_id);
+
+	/*
+	 * For MAP_CLEAR, we only need rmr_id_t for chunk number,
+	 * and our member_id to say to clear the above chunk number for ths storage node.
+	 *
+	 * We also update the minimum members needed for map update.
+	 */
+	iu->msg.hdr.group_id = cpu_to_le32(pool->group_id);
+	iu->msg.hdr.type = cpu_to_le16(RMR_MSG_MAP_CLEAR);
+	iu->msg.hdr.__padding = 0;
+
+	iu->msg.id_a = cpu_to_le64(req->id.a);
+	iu->msg.id_b = cpu_to_le64(req->id.b);
+	iu->msg.member_id = srv_pool->member_id;
+
+	iu->msg.flags = cpu_to_le32(RMR_OP_WRITE);
+
+	iu->conf = rmr_req_map_clear_done;
+	iu->priv = req;
+
+	req->iu = iu;
+
+	err = rmr_clt_send_map_update(pool, req->iu);
+	if (err) {
+		pr_err("%s error %d\n", __func__, err);
+		rmr_clt_put_iu(pool, req->iu);
+		goto err;
+	}
+
+	pr_debug("send map clear submitted\n");
+	return;
+
+err:
+	rmr_req_sync_complete(req);
+}
-- 
2.43.0


  parent reply	other threads:[~2026-05-05  7:47 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-05  7:46 [LSF/MM/BPF RFC PATCH 00/13] Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 01/13] RDMA/rmr: add public and private headers Md Haris Iqbal
2026-05-05  7:46 ` Md Haris Iqbal [this message]
2026-05-05  7:46 ` [PATCH 03/13] RDMA/rmr: client: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 04/13] RDMA/rmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 05/13] RDMA/rmr: server: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 06/13] RDMA/rmr: server: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 07/13] RDMA/rmr: include client and server modules into kernel compilation Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 08/13] block/brmr: add private headers with brmr protocol structs and helpers Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 09/13] block/brmr: client: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 10/13] block/brmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 11/13] block/brmr: server: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 12/13] block/brmr: server: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 13/13] block/brmr: include client and server modules into kernel compilation Md Haris Iqbal
2026-05-12 10:34 ` [LSF/MM/BPF RFC PATCH 00/13] Leon Romanovsky

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260505074644.195453-3-haris.iqbal@ionos.com \
    --to=haris.iqbal@ionos.com \
    --cc=axboe@kernel.dk \
    --cc=bvanassche@acm.org \
    --cc=hch@lst.de \
    --cc=jgg@ziepe.ca \
    --cc=jia.li@ionos.com \
    --cc=jinpu.wang@ionos.com \
    --cc=leon@kernel.org \
    --cc=linux-block@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=linux-rdma@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox