From: Md Haris Iqbal <haris.iqbal@ionos.com>
To: linux-block@vger.kernel.org, linux-rdma@vger.kernel.org
Cc: linux-kernel@vger.kernel.org, axboe@kernel.dk,
bvanassche@acm.org, hch@lst.de, jgg@ziepe.ca, leon@kernel.org,
jinpu.wang@ionos.com, Md Haris Iqbal <haris.iqbal@ionos.com>,
Jia Li <jia.li@ionos.com>
Subject: [PATCH 03/13] RDMA/rmr: client: main functionality
Date: Tue, 5 May 2026 09:46:15 +0200 [thread overview]
Message-ID: <20260505074644.195453-4-haris.iqbal@ionos.com> (raw)
In-Reply-To: <20260505074644.195453-1-haris.iqbal@ionos.com>
Add the RMR client implementation:
rmr-clt.c client core: session and pool-session state
machine, RTRS transport setup, IO submission
and completion paths, command messaging.
rmr-map-mgmt.c client-side dirty-map management: spreading
updates to pool members, handling map check
responses and resync coordination.
rmr-clt-stats.c client per-pool statistics counters.
rmr-clt-trace.c tracepoint definitions for client state
rmr-clt-trace.h transitions and IO submission events.
The trace points are referenced from rmr-clt.c and
rmr-map-mgmt.c, so they are added together with the client core.
These files are not compiled until the modules are wired into the
build in a later patch in this series.
Signed-off-by: Md Haris Iqbal <haris.iqbal@ionos.com>
Signed-off-by: Jia Li <jia.li@ionos.com>
---
drivers/infiniband/ulp/rmr/rmr-clt-stats.c | 29 +
drivers/infiniband/ulp/rmr/rmr-clt-trace.c | 11 +
drivers/infiniband/ulp/rmr/rmr-clt-trace.h | 110 +
drivers/infiniband/ulp/rmr/rmr-clt.c | 3866 ++++++++++++++++++++
drivers/infiniband/ulp/rmr/rmr-map-mgmt.c | 933 +++++
5 files changed, 4949 insertions(+)
create mode 100644 drivers/infiniband/ulp/rmr/rmr-clt-stats.c
create mode 100644 drivers/infiniband/ulp/rmr/rmr-clt-trace.c
create mode 100644 drivers/infiniband/ulp/rmr/rmr-clt-trace.h
create mode 100644 drivers/infiniband/ulp/rmr/rmr-clt.c
create mode 100644 drivers/infiniband/ulp/rmr/rmr-map-mgmt.c
diff --git a/drivers/infiniband/ulp/rmr/rmr-clt-stats.c b/drivers/infiniband/ulp/rmr/rmr-clt-stats.c
new file mode 100644
index 000000000000..83a4089defc0
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-clt-stats.c
@@ -0,0 +1,29 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include "rmr-clt.h"
+
+int rmr_clt_reset_read_retries(struct rmr_clt_stats *stats, bool enable)
+{
+ if (unlikely(!enable))
+ return -EINVAL;
+
+ atomic_set(&stats->read_retries, 0);
+
+ return 0;
+}
+
+ssize_t rmr_clt_stats_read_retries_to_str(
+ struct rmr_clt_stats *stats, char *page)
+{
+ return sysfs_emit(page, "%u\n",
+ atomic_read(&stats->read_retries));
+}
+
diff --git a/drivers/infiniband/ulp/rmr/rmr-clt-trace.c b/drivers/infiniband/ulp/rmr/rmr-clt-trace.c
new file mode 100644
index 000000000000..2e6d9adee7c8
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-clt-trace.c
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+#include "rmr-clt.h"
+
+#define CREATE_TRACE_POINTS
+#include "rmr-clt-trace.h"
+
diff --git a/drivers/infiniband/ulp/rmr/rmr-clt-trace.h b/drivers/infiniband/ulp/rmr/rmr-clt-trace.h
new file mode 100644
index 000000000000..1d9a511dc763
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-clt-trace.h
@@ -0,0 +1,110 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+#undef TRACE_SYSTEM
+#define TRACE_SYSTEM rmr_clt
+
+#if !defined(_TRACE_RMR_CLT_H) || defined(TRACE_HEADER_MULTI_READ)
+#define _TRACE_RMR_CLT_H
+
+#include <linux/tracepoint.h>
+
+struct rmr_clt_pool_sess;
+
+TRACE_DEFINE_ENUM(RMR_CLT_POOL_SESS_CREATED);
+TRACE_DEFINE_ENUM(RMR_CLT_POOL_SESS_NORMAL);
+TRACE_DEFINE_ENUM(RMR_CLT_POOL_SESS_FAILED);
+TRACE_DEFINE_ENUM(RMR_CLT_POOL_SESS_RECONNECTING);
+TRACE_DEFINE_ENUM(RMR_CLT_POOL_SESS_REMOVING);
+
+#define show_pool_sess_state(x) \
+ __print_symbolic(x, \
+ { RMR_CLT_POOL_SESS_CREATED, "CREATED" }, \
+ { RMR_CLT_POOL_SESS_NORMAL, "NORMAL" }, \
+ { RMR_CLT_POOL_SESS_FAILED, "FAILED" }, \
+ { RMR_CLT_POOL_SESS_RECONNECTING, "RECONNECTING" }, \
+ { RMR_CLT_POOL_SESS_REMOVING, "REMOVING" })
+
+TRACE_EVENT(pool_sess_change_state,
+ TP_PROTO(struct rmr_clt_pool_sess *pool_sess,
+ int newstate,
+ int oldstate,
+ int changed),
+
+ TP_ARGS(pool_sess, newstate, oldstate, changed),
+
+ TP_STRUCT__entry(
+ __string(sessname, pool_sess->sessname)
+ __field(int, newstate)
+ __field(int, oldstate)
+ __field(int, changed)
+ ),
+
+ TP_fast_assign(
+ __assign_str(sessname);
+ __entry->newstate = newstate;
+ __entry->oldstate = oldstate;
+ __entry->changed = changed;
+ ),
+
+ TP_printk("RMR-CLT: sessname=%s newstate='%s' oldstate='%s' state-changed='%d'",
+ __get_str(sessname),
+ show_pool_sess_state(__entry->newstate),
+ show_pool_sess_state(__entry->oldstate),
+ __entry->changed
+ )
+);
+
+DECLARE_EVENT_CLASS(rtrs_clt_request_class,
+ TP_PROTO(int dir, struct rmr_clt_sess_iu *sess_iu),
+
+ TP_ARGS(dir, sess_iu),
+
+ TP_STRUCT__entry(
+ __field(int, dir)
+ __array(char, sessname, NAME_MAX)
+ __field(void *, rtrs)
+ __field(void *, clt_sess)
+ ),
+
+ TP_fast_assign(
+ struct rmr_clt_pool_sess *pool_sess = sess_iu->pool_sess;
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+
+ __entry->dir = dir;
+ memcpy(__entry->sessname, pool_sess->sessname, NAME_MAX);
+ __entry->rtrs = clt_sess->rtrs;
+ __entry->clt_sess = clt_sess;
+ ),
+
+ TP_printk("rtrs clt request: sessname=%s dir=%s rtrs=%p clt_sess=%p",
+ __entry->sessname,
+ __print_symbolic(__entry->dir,
+ { READ, "READ" },
+ { WRITE, "WRITE" }),
+ __entry->rtrs,
+ __entry->clt_sess
+ )
+);
+
+#define DEFINE_RTRS_CLT_EVENT(name) \
+DEFINE_EVENT(rtrs_clt_request_class, name, \
+ TP_PROTO(int dir, struct rmr_clt_sess_iu *sess_iu), \
+ TP_ARGS(dir, sess_iu))
+
+DEFINE_RTRS_CLT_EVENT(send_usr_msg);
+DEFINE_RTRS_CLT_EVENT(retry_failed_read);
+DEFINE_RTRS_CLT_EVENT(rmr_clt_request);
+DEFINE_RTRS_CLT_EVENT(rmr_clt_cmd_with_rsp);
+DEFINE_RTRS_CLT_EVENT(send_map_update);
+
+#endif /* _TRACE_RMR_CLT_H */
+
+#undef TRACE_INCLUDE_PATH
+#define TRACE_INCLUDE_PATH .
+#define TRACE_INCLUDE_FILE rmr-clt-trace
+#include <trace/define_trace.h>
+
diff --git a/drivers/infiniband/ulp/rmr/rmr-clt.c b/drivers/infiniband/ulp/rmr/rmr-clt.c
new file mode 100644
index 000000000000..33e4b6d84b0b
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-clt.c
@@ -0,0 +1,3866 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include <linux/module.h>
+#include <linux/blkdev.h>
+#include <linux/slab.h>
+#include <linux/wait.h>
+#include <linux/sched.h>
+
+#include "rmr-clt.h"
+#include "rmr-clt-trace.h"
+
+MODULE_AUTHOR("The RMR and BRMR developers");
+MODULE_DESCRIPTION("RMR Client");
+MODULE_VERSION(RMR_VER_STRING);
+MODULE_LICENSE("GPL");
+
+#define RMR_CLT_SEND_MSG_TIMEOUT_MS 30000
+
+//static int send_msg_leave_pool(struct rmr_clt_pool_sess *pool_sess, bool wait);
+static void retry_failed_read(struct work_struct *work);
+static DEFINE_MUTEX(g_sess_lock);
+static LIST_HEAD(g_sess_list);
+
+static bool rmr_get_clt_pool(struct rmr_clt_pool *clt_pool)
+{
+ pr_debug("pool %s, before inc refcount %d\n",
+ clt_pool->pool->poolname, refcount_read(&clt_pool->refcount));
+ return refcount_inc_not_zero(&clt_pool->refcount);
+}
+
+static struct rmr_clt_pool *rmr_find_and_get_clt_pool(const char *poolname)
+{
+ struct rmr_pool *pool;
+ struct rmr_clt_pool *clt_pool;
+
+ mutex_lock(&pool_mutex);
+ pool = rmr_find_pool(poolname);
+ if (!pool) {
+ clt_pool = ERR_PTR(-ENOENT);
+ goto out;
+ }
+
+ clt_pool = (struct rmr_clt_pool *)pool->priv;
+ if (!rmr_get_clt_pool(clt_pool))
+ clt_pool = ERR_PTR(-EINVAL);
+
+out:
+ mutex_unlock(&pool_mutex);
+ return clt_pool;
+}
+
+void rmr_put_clt_pool(struct rmr_clt_pool *clt_pool)
+{
+ struct rmr_pool *pool = clt_pool->pool;
+
+ might_sleep();
+
+ pr_debug("clt pool %s, before dec refcnt %d\n",
+ (pool ? pool->poolname : "(empty)"), refcount_read(&clt_pool->refcount));
+ if (refcount_dec_and_test(&clt_pool->refcount)) {
+
+ destroy_workqueue(clt_pool->recover_wq);
+ mutex_destroy(&clt_pool->io_freeze_lock);
+ mutex_destroy(&clt_pool->clt_pool_lock);
+
+ if (pool) {
+ pr_info("clt: destroy pool %s\n", pool->poolname);
+ free_pool(pool);
+ }
+
+ kfree(clt_pool);
+ }
+}
+
+static inline int rmr_clt_sess_get(struct rmr_clt_sess *sess)
+{
+ return kref_get_unless_zero(&sess->kref);
+}
+
+static void rmr_clt_sess_release(struct kref *kref)
+{
+ struct rmr_clt_sess *clt_sess;
+
+ clt_sess = container_of(kref, struct rmr_clt_sess, kref);
+
+ mutex_lock(&g_sess_lock);
+
+ rmr_clt_destroy_clt_sess_sysfs_files(clt_sess);
+
+ pr_info("close rtrs for session %s\n", clt_sess->sessname);
+ rtrs_clt_close(clt_sess->rtrs);
+ list_del(&clt_sess->g_list);
+ kfree(clt_sess);
+
+ mutex_unlock(&g_sess_lock);
+}
+
+void rmr_clt_sess_put(struct rmr_clt_sess *sess)
+{
+ kref_put(&sess->kref, rmr_clt_sess_release);
+}
+
+static const char *rmr_get_clt_pool_state_name(enum rmr_clt_pool_state state)
+{
+ switch (state) {
+ case RMR_CLT_POOL_STATE_JOINED: return "RMR_CLT_POOL_STATE_JOINED";
+ case RMR_CLT_POOL_STATE_IN_USE: return "RMR_CLT_POOL_STATE_IN_USE";
+
+ default: return "Unknown state";
+ }
+}
+
+static void rmr_clt_dump_state(struct rmr_clt_pool *rmr_clt_pool)
+{
+ char current_state[1024] = {0};
+ int i, n = 0, len = sizeof(current_state);
+
+ for (i = 0; i < RMR_CLT_POOL_STATE_MAX; i++) {
+ enum rmr_clt_pool_state state = (enum rmr_clt_pool_state)i;
+
+ if (test_bit(state, &rmr_clt_pool->state))
+ n += scnprintf(current_state + n, len - n, "%s, ",
+ rmr_get_clt_pool_state_name(state));
+ }
+
+ pr_info("%s: RMR client pool current state: %s\n", __func__, current_state);
+}
+
+/**
+ * rmr_clt_change_pool_state() - Change clt pool state
+ *
+ * @clt_pool: Client pool whose state is to be changed
+ * @new_state: New state to set
+ * @set: Informs whether to set/unset the given new+state
+ */
+void rmr_clt_change_pool_state(struct rmr_clt_pool *rmr_clt_pool,
+ enum rmr_clt_pool_state new_state, bool set)
+{
+ if (set) {
+ set_bit(new_state, &rmr_clt_pool->state);
+ pr_info("%s: state %s set\n",
+ __func__, rmr_get_clt_pool_state_name(new_state));
+ } else {
+ clear_bit(new_state, &rmr_clt_pool->state);
+ pr_info("%s: state %s cleared\n",
+ __func__, rmr_get_clt_pool_state_name(new_state));
+ }
+
+ rmr_clt_dump_state(rmr_clt_pool);
+}
+
+/**
+ * send_map_get_version() - Send a map get version command
+ *
+ * @pool_sess: pool session where to send the message
+ *
+ * Description:
+ * Ask the storage node to send back its map_version.
+ *
+ * Return:
+ * 0 on success
+ * Negative error in case of failure
+ */
+
+/**
+ * rmr_clt_md_update() - Update the client (non-sync) pool metadata
+ */
+static void rmr_clt_md_update(struct rmr_pool *pool)
+{
+ struct rmr_pool_md *clt_md = &pool->pool_md;
+
+ if (pool->sync)
+ return;
+
+ clt_md->map_ver = pool->map_ver;
+}
+
+#if 0
+static int send_map_set_version(struct rmr_clt_pool_sess *pool_sess, u64 ver)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool *pool = pool_sess->pool;
+ int err;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_MAP_SET_VER;
+ msg.set_map_ver_cmd.map_ver = ver;
+
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (err) {
+ pr_err("%s: For sess %s, %s failed with err %d\n",
+ __func__, pool_sess->sessname, rmr_get_cmd_name(msg.cmd_type), err);
+ }
+ return err;
+}
+
+/**
+ * rmr_clt_coordinate_discard() - Coordinate the discard_entries flag
+ *
+ * @pool: the client pool
+ * @member_id: member id of the source node
+ *
+ * Description:
+ * This function sends discard request to all normal pool sessions of the pool.
+ * It is to solve the case where network is partitioned between the server nodes
+ * and only the client connects those partitions. Any request that failed on a session
+ * would fail this call.
+ *
+ * TODO: To address the network partitions (including the client), wait for consistency
+ * protocols.
+ *
+ * Return:
+ * 0 on success
+ * Negative error in case of failure
+ *
+ * Pre-requisite: rcu read lock should be held by caller
+ */
+static int rmr_clt_coordinate_discard(struct rmr_pool *pool, u8 cmd_type, u8 member_id)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ int err = 0;
+
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ /*
+ * If the pool session state is not normal, the dirty maps of the that pool is
+ * likely corrupted. Don't bother to send the discards.
+ */
+ if (atomic_read(&pool_sess->state) != RMR_CLT_POOL_SESS_NORMAL)
+ continue;
+
+ pr_info("%s: send discards to (pool_sess %s: %d) with member_id %u\n",
+ __func__, pool_sess->sessname, pool_sess->member_id, member_id);
+
+ /* Send discard request to the pool session. */
+ err = send_discard(pool_sess, cmd_type, member_id);
+ if (err) {
+ pr_err("%s: Failed discard request on sess %s for member_id %u\n",
+ __func__, pool_sess->sessname, member_id);
+ return err;
+ }
+ }
+
+ return err;
+}
+
+static int rmr_clt_handle_discard(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_dirty_id_map *map;
+ int idx, ret, err = 0;
+ u64 map_ver;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+
+ /* Find out if there is pending discard requests on the server side */
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ ret = send_map_get_version(pool_sess, &map_ver);
+ if (ret)
+ continue;
+
+ /*
+ * When disk replacement appears at the storage node, pserver will set the all
+ * map entries of that server to dirty.
+ */
+ if (RMR_STORE_IS_REPLACE(map_ver)) {
+ map = rmr_pool_find_map(pool, pool_sess->member_id);
+ if (!map) {
+ pr_err("The clt pool %s cannot find map for member_id %u\n",
+ pool->poolname, pool_sess->member_id);
+ err = -EINVAL;
+ goto out;
+ }
+
+ rmr_map_set_dirty_all(map, MAP_NO_FILTER);
+
+ /* Check any normal pool session failed to receive discards */
+ err = rmr_clt_coordinate_discard(pool, RMR_CMD_SEND_DISCARD,
+ pool_sess->member_id);
+ if (err) {
+ pr_err("%s: Failed to coordinate discard state for member_id %u\n",
+ __func__, pool_sess->member_id);
+ goto out;
+ }
+
+ /* update the map version */
+ err = send_map_set_version(pool_sess, RMR_STORE_UNSET_REPLACE(map_ver));
+ if (err) {
+ pr_err("%s: Failed to reset map version for %s\n",
+ __func__, pool_sess->sessname);
+ goto out;
+ }
+
+ /* Everyone knows about the discarded entries now. */
+ err = rmr_clt_coordinate_discard(pool, RMR_CMD_DISCARD_CLEAR_FLAG,
+ pool_sess->member_id);
+ if (err) {
+ pr_err("%s: Failed to clear discard flag for S%u\n",
+ __func__, pool_sess->member_id);
+ goto out;
+ }
+ }
+ }
+
+out:
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ return err;
+}
+#endif
+
+static int rmr_clt_start_send_md(struct rmr_pool *pool);
+
+/**
+ * recover_work() - A work thread, which performs a number of tasks at regular intervals
+ *
+ * @work: The work struct holding the data
+ *
+ * Description:
+ * Every client pool has its own work thread. It performs the following 3 tasks.
+ * 1) Pool sessions in NORMAL state, and having dirty map entries associated with it,
+ * are checked, and if the entries are cleared from the particular storage node, then
+ * they are deleted from the pserver also.
+ * 2) If the pool session state is FAILED, but the network state (clt session) is connected,
+ * then a store check message is send to the pool session. The storage node wil confirm
+ * with the backend, if IOs can be send or not.
+ * 3) Send the client pool metadata to the servers.
+ */
+void recover_work(struct work_struct *work)
+{
+ struct rmr_pool *pool;
+ struct rmr_clt_pool *clt_pool;
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_pool_md *clt_md;
+ int index, lock_idx = 0;
+
+ clt_pool = container_of(to_delayed_work(work), struct rmr_clt_pool, recover_dwork);
+ pool = clt_pool->pool;
+
+ pr_debug("check map for pool %s started...\n", pool->poolname);
+
+ lock_idx = srcu_read_lock(&pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+
+ pr_debug("pool %s sess %s sess->member_id %d sess->state %d\n",
+ pool->poolname, pool_sess->sessname,
+ pool_sess->member_id, atomic_read(&pool_sess->state));
+
+ clt_md = &pool->pool_md;
+ index = rmr_pool_find_md(clt_md, pool_sess->member_id, false);
+ if (index < 0) {
+ pr_debug("%s failed to find pool_sess %u\n",
+ __func__, pool_sess->member_id);
+ continue;
+ }
+ if (pool_sess->maintenance_mode)
+ goto pool_sess_state_check;
+
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_NORMAL) {
+ struct rmr_dirty_id_map *map;
+
+ map = rmr_pool_find_map(pool, pool_sess->member_id);
+ if (!map) {
+ pr_debug("pool %s no map found for member_id %u\n",
+ pool->poolname, pool_sess->member_id);
+ continue;
+ }
+ if (!rmr_map_empty(map)) {
+ pr_debug("pool %s sess %s map is not empty, check stg map...\n",
+ pool->poolname, pool_sess->sessname);
+ send_map_check(pool_sess);
+ }
+ }
+pool_sess_state_check:
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_FAILED &&
+ clt_sess->state == RMR_CLT_SESS_CONNECTED) {
+ pr_debug("pool %s sess %s try pool sess recover\n",
+ pool->poolname, pool_sess->sessname);
+ send_store_check(pool_sess);
+ }
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, lock_idx);
+
+ rmr_clt_md_update(pool);
+ /* If the send fails, wait for the next update. */
+ rmr_clt_start_send_md(pool);
+
+ pr_debug("check map for pool %s done. schedule next one.\n", pool->poolname);
+
+ queue_delayed_work(clt_pool->recover_wq, &clt_pool->recover_dwork,
+ msecs_to_jiffies(RMR_RECOVER_INTERVAL_MS));
+}
+
+static int init_clt_pool(struct rmr_clt_pool *clt_pool)
+{
+ int err;
+
+ clt_pool->pcpu_sess = alloc_percpu(typeof(*clt_pool->pcpu_sess));
+ if (unlikely(!clt_pool->pcpu_sess)) {
+ err = -ENOMEM;
+ goto out_err;
+ }
+
+ return 0;
+
+out_err:
+ return err;
+}
+
+static void destroy_clt_pool(struct rmr_pool *pool)
+{
+ int i;
+ struct rmr_clt_pool *clt_pool;
+ struct rmr_dirty_id_map *map;
+ struct rmr_dirty_id_map *maplist = NULL;
+
+ clt_pool = (struct rmr_clt_pool *)pool->priv;
+ if (clt_pool) {
+ free_percpu(clt_pool->pcpu_sess);
+ clt_pool->pcpu_sess = NULL;
+ }
+
+ mutex_lock(&pool->maps_lock);
+ for (i = 0; i < pool->maps_cnt; i++) {
+ map = rcu_dereference_protected(pool->maps[i],
+ lockdep_is_held(&pool->maps_lock));
+ if (WARN_ON(!map))
+ continue;
+ rcu_assign_pointer(pool->maps[i], NULL);
+ map->next = maplist;
+ maplist = map;
+ }
+ pool->maps_cnt = 0;
+
+ if (maplist)
+ synchronize_srcu(&pool->map_srcu);
+
+ mutex_unlock(&pool->maps_lock);
+
+ rmr_maplist_destroy(maplist);
+}
+
+static void rmr_put_sess_iu(struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_clt_sess_iu *sess_iu);
+
+static struct rmr_iu *
+rmr_alloc_iu(void)
+{
+ struct rmr_iu *iu;
+
+ iu = kzalloc(sizeof(*iu), GFP_KERNEL);
+ if (!iu)
+ return NULL;
+ INIT_LIST_HEAD(&iu->sess_list);
+ iu->num_sessions = 0;
+ refcount_set(&iu->ref, 1);
+ return iu;
+}
+
+void rmr_get_iu(struct rmr_iu *iu)
+{
+ refcount_inc(&iu->ref);
+}
+
+void rmr_put_iu(struct rmr_iu *iu)
+{
+ struct rmr_clt_sess_iu *sess_iu, *tmp;
+
+ if (refcount_dec_and_test(&iu->ref)) {
+ list_for_each_entry_safe(sess_iu, tmp,
+ &iu->sess_list, entry) {
+ if (!list_empty(&sess_iu->entry))
+ list_del_init(&sess_iu->entry);
+ rmr_put_sess_iu(sess_iu->pool_sess, sess_iu);
+ }
+ kfree(iu);
+ }
+}
+
+void rmr_clt_free_pool_sess(struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_clt_pool *clt_pool;
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+
+ clt_pool = (struct rmr_clt_pool *)pool_sess->pool->priv;
+
+ if (!list_empty(&pool_sess->clt_sess_entry)) {
+ mutex_lock(&clt_sess->lock);
+ list_del(&pool_sess->clt_sess_entry);
+ mutex_unlock(&clt_sess->lock);
+ }
+
+ pr_info("before free pool_sess %s, clt_sess refcount=%d\n",
+ pool_sess->sessname, kref_read(&clt_sess->kref));
+
+ kfree(pool_sess);
+}
+
+void rmr_clt_put_pool(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+
+ rmr_put_clt_pool(clt_pool);
+}
+EXPORT_SYMBOL(rmr_clt_put_pool);
+
+/**
+ * rmr_clt_open() - Open a client for use
+ *
+ * @priv: private data for the user
+ * @link_ev: holds the link event callback
+ * @poolname: name of the pool to open
+ *
+ * Description:
+ * Open an RMR pool for the user to use. The rmr pool must have at least one session.
+ * A single pool can be opened and used by only a single user.
+ *
+ * Return:
+ * Returns pointer to the rmr pool opened.
+ */
+struct rmr_pool *rmr_clt_open(void *priv, rmr_clt_ev_fn *link_ev, const char *poolname)
+{
+ struct rmr_clt_pool *clt_pool;
+ int err;
+
+ clt_pool = rmr_find_and_get_clt_pool(poolname);
+ if (IS_ERR(clt_pool)) {
+ pr_err("RMR client pool '%s' is not found\n", poolname);
+ err = PTR_ERR(clt_pool);
+ goto err_out;
+ }
+
+ if (!mutex_trylock(&clt_pool->clt_pool_lock)) {
+ pr_err("RMR client pool '%s' is busy, recovery in progress\n", poolname);
+ err = -EBUSY;
+ goto put_err;
+ }
+ if (test_bit(RMR_CLT_POOL_STATE_IN_USE, &clt_pool->state)) {
+ pr_err("RMR client pool '%s' is already in use\n", poolname);
+ err = -ENOENT;
+ goto put_err;
+ }
+
+ if (!test_bit(RMR_CLT_POOL_STATE_JOINED, &clt_pool->state)) {
+ pr_err("RMR client pool '%s' has no sessions open\n", poolname);
+ err = -ENOENT;
+ goto put_err;
+ }
+
+ clt_pool->link_ev = link_ev;
+ clt_pool->priv = priv;
+
+ err = init_clt_pool(clt_pool);
+ if (unlikely(err)) {
+ pr_err("RMR client pool '%s' failed to initialize: %d\n", poolname, err);
+ goto put_err;
+ }
+
+ rmr_clt_change_pool_state(clt_pool, RMR_CLT_POOL_STATE_IN_USE, true);
+
+ mutex_unlock(&clt_pool->clt_pool_lock);
+ return clt_pool->pool;
+
+put_err:
+ mutex_unlock(&clt_pool->clt_pool_lock);
+ rmr_put_clt_pool(clt_pool);
+err_out:
+ return ERR_PTR(err);
+}
+EXPORT_SYMBOL(rmr_clt_open);
+
+void rmr_clt_close(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+
+ mutex_lock(&clt_pool->clt_pool_lock);
+ rmr_clt_change_pool_state(clt_pool, RMR_CLT_POOL_STATE_IN_USE, false);
+
+ pr_info("%s: RMR client close called for pool %s\n", __func__, pool->poolname);
+
+ /*
+ * Freeze I/O.
+ * Degrade ref count to the usual model with a single shared
+ * atomic_t counter
+ */
+ rmr_clt_pool_io_freeze(clt_pool);
+ pr_info("pool %s wait for inflight io to complete\n", clt_pool->pool->poolname);
+
+ /* Wait for all completion */
+ rmr_clt_pool_io_wait_complete(clt_pool);
+
+ pr_info("pool %s inflight io completed\n", clt_pool->pool->poolname);
+
+ clt_pool->link_ev = NULL;
+ clt_pool->priv = NULL;
+
+ /* Unfreeze and Resurrect */
+ rmr_clt_pool_io_unfreeze(clt_pool);
+
+ mutex_unlock(&clt_pool->clt_pool_lock);
+
+ rmr_put_clt_pool(clt_pool);
+}
+EXPORT_SYMBOL(rmr_clt_close);
+
+void *rmr_clt_get_priv(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool *clt_pool;
+
+ clt_pool = (struct rmr_clt_pool *)pool->priv;
+ if (clt_pool)
+ return clt_pool->priv;
+
+ return NULL;
+}
+EXPORT_SYMBOL(rmr_clt_get_priv);
+
+static struct rmr_clt_sess *alloc_clt_sess(const char *sessname)
+{
+ struct rmr_clt_sess *sess;
+
+ sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
+ if (unlikely(!sess)) {
+ pr_err("Failed to create session %s,"
+ " allocating session struct failed\n",
+ sessname);
+ return ERR_PTR(-ENOMEM);
+ }
+ strscpy(sess->sessname, sessname, sizeof(sess->sessname));
+ mutex_init(&sess->lock);
+ INIT_LIST_HEAD(&sess->pool_sess_list);
+ kref_init(&sess->kref);
+ sess->state = RMR_CLT_SESS_DISCONNECTED;
+
+ return sess;
+}
+
+static struct rmr_clt_pool_sess *alloc_pool_sess(struct rmr_pool *pool,
+ struct rmr_clt_sess *clt_sess)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+
+ pool_sess = kzalloc_node(sizeof(*pool_sess), GFP_KERNEL, NUMA_NO_NODE);
+ if (unlikely(!pool_sess)) {
+ pr_err("Failed to allocate session for pool %s\n", pool->poolname);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ strscpy(pool_sess->sessname, clt_sess->sessname, NAME_MAX);
+ INIT_LIST_HEAD(&pool_sess->entry);
+ INIT_LIST_HEAD(&pool_sess->clt_sess_entry);
+ pool_sess->pool = pool;
+ pool_sess->clt_sess = clt_sess;
+ pool_sess->maintenance_mode = false;
+ atomic_set(&pool_sess->state, RMR_CLT_POOL_SESS_CREATED);
+
+ return pool_sess;
+}
+
+/*
+ * Checks if the session already exists (search by session name)
+ * Returns TRUE if session found, FALSE otherwise.
+ */
+static bool __find_sess_by_name(struct rmr_pool *pool, const char *sessname)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ int idx;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (!strcmp(sessname, pool_sess->sessname)) {
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ return true;
+ }
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ return false;
+}
+
+/**
+ * __find_sess_by_member_id() - Find and return pool_sess with a given member_id
+ *
+ * @pool: RMR pool to search pool_sess in
+ * @member_id: member ID to search
+ *
+ * Return:
+ * Pointer to rmr_clt_pool_sess on success
+ * NULL if no pool session exists with the given member_id
+ *
+ * Context:
+ * The caller should hold srcu_read_lock
+ */
+static struct rmr_clt_pool_sess *__find_sess_by_member_id(struct rmr_pool *pool, u8 member_id)
+{
+ struct rmr_clt_pool_sess *pool_sess = NULL, *tmp_pool_sess;
+
+ list_for_each_entry_srcu(tmp_pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (member_id == tmp_pool_sess->member_id) {
+ pool_sess = tmp_pool_sess;
+ break;
+ }
+ }
+
+ return pool_sess;
+}
+
+/**
+ * pool_sess_change_state() - Change pool session state
+ *
+ * @pool_sess: Pool session whose state is to be changed
+ * @newstate: New state which is to be set
+ *
+ * Description:
+ * Pool session states decide a number of crucial things.
+ * Where the IOs can be sent, which node has an outdated map, etc.
+ * As such, transitioning of states are important and is tightly controlled through
+ * this function. All state transitions should happen through this function.
+ *
+ * Return:
+ * True in case the state was changed
+ * False in case the state was not changed
+ */
+bool pool_sess_change_state(struct rmr_clt_pool_sess *pool_sess,
+ enum rmr_clt_pool_sess_state newstate)
+{
+ bool changed = false;
+ int oldstate = atomic_read(&pool_sess->state);
+
+ if (WARN_ON(oldstate == RMR_CLT_POOL_SESS_REMOVING))
+ goto out;
+
+ switch (newstate) {
+ case RMR_CLT_POOL_SESS_NORMAL:
+ if (pool_sess->maintenance_mode)
+ break;
+ /*
+ * Non-sync sessions must pass through RECONNECTING before
+ * reaching NORMAL so that a map update can take place first.
+ * Sync sessions skip RECONNECTING entirely and go FAILED→NORMAL
+ * directly.
+ */
+ if (!rmr_clt_sess_is_sync(pool_sess)) {
+ if (WARN_ON(oldstate == RMR_CLT_POOL_SESS_FAILED))
+ break;
+ if (oldstate == RMR_CLT_POOL_SESS_CREATED ||
+ oldstate == RMR_CLT_POOL_SESS_RECONNECTING)
+ changed = atomic_try_cmpxchg(&pool_sess->state,
+ &oldstate,
+ newstate);
+ } else {
+ if (oldstate == RMR_CLT_POOL_SESS_CREATED ||
+ oldstate == RMR_CLT_POOL_SESS_FAILED ||
+ oldstate == RMR_CLT_POOL_SESS_RECONNECTING)
+ changed = atomic_try_cmpxchg(&pool_sess->state,
+ &oldstate,
+ newstate);
+ }
+ break;
+ case RMR_CLT_POOL_SESS_RECONNECTING:
+ /*
+ * Sync sessions never need a map update and must not enter
+ * RECONNECTING.
+ */
+ if (WARN_ON(rmr_clt_sess_is_sync(pool_sess) &&
+ !pool_sess->maintenance_mode))
+ break;
+ if (oldstate == RMR_CLT_POOL_SESS_FAILED ||
+ oldstate == RMR_CLT_POOL_SESS_CREATED ||
+ (oldstate == RMR_CLT_POOL_SESS_NORMAL && pool_sess->maintenance_mode))
+ changed = atomic_try_cmpxchg(&pool_sess->state,
+ &oldstate,
+ newstate);
+ break;
+ case RMR_CLT_POOL_SESS_FAILED:
+ changed = atomic_try_cmpxchg(&pool_sess->state,
+ &oldstate,
+ newstate);
+ /*
+ * TODO
+ * We should really be updating map version with the state,
+ * Or before it.
+ */
+ if (changed && oldstate != RMR_CLT_POOL_SESS_FAILED)
+ pool_sess->pool->map_ver++;
+ break;
+ case RMR_CLT_POOL_SESS_REMOVING:
+ changed = atomic_try_cmpxchg(&pool_sess->state,
+ &oldstate,
+ newstate);
+ break;
+ default:
+ pr_err("%s: Unknown state %d\n", __func__, newstate);
+ break;
+ }
+
+ if (changed && !rmr_clt_sess_is_sync(pool_sess)) {
+ if (newstate == RMR_CLT_POOL_SESS_NORMAL) {
+ /*
+ * Entering NORMAL: this session is no longer the last
+ * authoritative holder of the dirty map.
+ */
+ pool_sess->was_last_authoritative = false;
+ atomic_inc(&pool_sess->pool->normal_count);
+ } else if (oldstate == RMR_CLT_POOL_SESS_NORMAL) {
+ /*
+ * Leaving NORMAL via FAILED or maintenance-mode
+ * RECONNECTING: decrement the count of NORMAL sessions.
+ * If this was the last one, mark it as authoritative so
+ * that recovery can enable it directly (without a map
+ * update) when it comes back — its dirty map was the last
+ * complete one the pool had.
+ *
+ * REMOVING is not marked authoritative: a deliberate
+ * removal (delete or disassemble) is not an uncontrolled
+ * failure. On reassembly the leg goes through the full
+ * map update path and does not need the direct-enable
+ * shortcut.
+ */
+ if (newstate == RMR_CLT_POOL_SESS_FAILED ||
+ (newstate == RMR_CLT_POOL_SESS_RECONNECTING &&
+ pool_sess->maintenance_mode)) {
+ if (atomic_dec_and_test(&pool_sess->pool->normal_count))
+ pool_sess->was_last_authoritative = true;
+ } else {
+ /* REMOVING */
+ atomic_dec(&pool_sess->pool->normal_count);
+ }
+ }
+ }
+
+out:
+
+ trace_pool_sess_change_state(pool_sess, newstate, oldstate, changed);
+
+ return changed;
+}
+
+void rmr_clt_pool_io_freeze(struct rmr_clt_pool *clt_pool)
+{
+ struct rmr_pool *pool = clt_pool->pool;
+
+ mutex_lock(&clt_pool->io_freeze_lock);
+ if (atomic_inc_return(&clt_pool->io_freeze) == 1)
+ percpu_ref_kill(&pool->ids_inflight_ref);
+ mutex_unlock(&clt_pool->io_freeze_lock);
+}
+
+void rmr_clt_pool_io_unfreeze(struct rmr_clt_pool *clt_pool)
+{
+ struct rmr_pool *pool = clt_pool->pool;
+
+ mutex_lock(&clt_pool->io_freeze_lock);
+ if (atomic_dec_return(&clt_pool->io_freeze) == 0) {
+ reinit_completion(&pool->complete_done);
+ percpu_ref_reinit(&pool->ids_inflight_ref);
+
+ wake_up_all(&clt_pool->map_update_wq);
+ }
+ mutex_unlock(&clt_pool->io_freeze_lock);
+}
+
+void rmr_clt_pool_io_wait_complete(struct rmr_clt_pool *clt_pool)
+{
+ struct rmr_pool *pool = clt_pool->pool;
+
+ wait_for_completion(&pool->complete_done);
+}
+
+//am: what kind of locking is rquired for that ?
+static void set_pool_sess_states_to_failed(struct rmr_clt_sess *clt_sess)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+
+ mutex_lock(&clt_sess->lock);
+
+ list_for_each_entry(pool_sess, &clt_sess->pool_sess_list, clt_sess_entry) {
+ if (pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_FAILED))
+ pr_info("set sess %s to failed due to link_ev\n", pool_sess->sessname);
+ }
+ mutex_unlock(&clt_sess->lock);
+}
+
+static void rmr_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
+{
+ struct rmr_clt_sess *clt_sess = priv;
+
+ switch (ev) {
+ case RTRS_CLT_LINK_EV_DISCONNECTED:
+ pr_info("Rtrs link ev disconnected: session %s\n",
+ clt_sess->sessname);
+ clt_sess->state = RMR_CLT_SESS_DISCONNECTED;
+ set_pool_sess_states_to_failed(clt_sess);
+ break;
+ case RTRS_CLT_LINK_EV_RECONNECTED:
+ pr_info("Rtrs link ev reconnected: session %s\n",
+ clt_sess->sessname);
+ clt_sess->state = RMR_CLT_SESS_CONNECTED;
+ resend_join_pool(clt_sess);
+ break;
+ default:
+ pr_err("Unknown rtrs link event received (%d), "
+ "session: %s\n",
+ ev, clt_sess->sessname);
+ }
+}
+
+/*
+ * Gets an iu for I/O operations.
+ *
+ * Context:
+ * The call to this function should be protected with an srcu_read_lock.
+ */
+static struct rmr_clt_sess_iu *rmr_get_sess_iu(struct rmr_clt_pool_sess *pool_sess,
+ enum rtrs_clt_con_type con_type,
+ enum wait_type wait)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+ struct rmr_clt_sess_iu *sess_iu;
+ struct rtrs_permit *permit;
+
+ WARN_ON(!srcu_read_lock_held(&pool->sess_list_srcu));
+
+ if (clt_sess->state == RMR_CLT_SESS_DISCONNECTED) {
+ pr_info("The rmr client session %s state is disconnected\n", clt_sess->sessname);
+ return NULL;
+ }
+
+ sess_iu = kzalloc(sizeof(*sess_iu), GFP_KERNEL);
+ if (!sess_iu)
+ return NULL;
+
+ permit = rtrs_clt_get_permit(clt_sess->rtrs, con_type, wait);
+ if (unlikely(!permit)) {
+ kfree(sess_iu);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&sess_iu->entry);
+ sess_iu->permit = permit;
+ sess_iu->pool_sess = pool_sess;
+
+ return sess_iu;
+}
+
+/*
+ * Gets the iu for user messages.
+ * It will be reference counted initialized with refcount
+ */
+static inline struct rmr_clt_sess_iu *rmr_msg_get_iu(struct rmr_clt_pool_sess *pool_sess,
+ enum rtrs_clt_con_type con_type,
+ enum wait_type wait, int refcount)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_clt_sess_iu *sess_iu;
+ int idx;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+
+ sess_iu = rmr_get_sess_iu(pool_sess, con_type, wait);
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ if (unlikely(!sess_iu))
+ return NULL;
+
+ init_waitqueue_head(&sess_iu->comp.wait);
+ sess_iu->comp.errno = INT_MAX;
+ atomic_set(&sess_iu->refcount, refcount);
+
+ return sess_iu;
+}
+
+/*
+ * reference counted put, refcount has to be initialized.
+ */
+void rmr_msg_put_iu(struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_clt_sess_iu *sess_iu)
+{
+ if (atomic_dec_and_test(&sess_iu->refcount)) {
+ rtrs_clt_put_permit(pool_sess->clt_sess->rtrs, sess_iu->permit);
+ kfree(sess_iu);
+ }
+}
+
+/*
+ * put the sess_iu without reference counting.
+ * I/O does not need reference counting.
+ */
+static void rmr_put_sess_iu(struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_clt_sess_iu *sess_iu)
+{
+ rtrs_clt_put_permit(pool_sess->clt_sess->rtrs, sess_iu->permit);
+ kfree(sess_iu);
+}
+
+void wake_up_iu_comp(struct rmr_clt_sess_iu *sess_iu)
+{
+ sess_iu->comp.errno = sess_iu->errno;
+ wake_up(&sess_iu->comp.wait);
+}
+
+void msg_conf(void *priv, int errno)
+{
+ struct rmr_clt_sess_iu *sess_iu = (struct rmr_clt_sess_iu *)priv;
+
+ sess_iu->errno = errno;
+ /* just schedule the work because kfree must not be done here */
+ schedule_work(&sess_iu->work);
+}
+
+static int send_usr_msg(struct rtrs_clt_sess *rtrs, int dir,
+ struct rmr_clt_sess_iu *sess_iu,
+ struct kvec *vec, size_t nr, size_t len,
+ struct scatterlist *sg, unsigned int sg_len,
+ void (*conf)(struct work_struct *work),
+ int *errno, enum rmr_wait_type wait)
+{
+ int err;
+ struct rtrs_clt_req_ops req_ops;
+
+ INIT_WORK(&sess_iu->work, conf);
+ req_ops = (struct rtrs_clt_req_ops){
+ .priv = sess_iu,
+ .conf_fn = msg_conf,
+ };
+
+ trace_send_usr_msg(dir, sess_iu);
+
+ err = rtrs_clt_request(dir, &req_ops, rtrs, sess_iu->permit,
+ vec, nr, len, sg, sg_len);
+ if (!err && wait) {
+ wait_event_timeout(sess_iu->comp.wait,
+ sess_iu->comp.errno != INT_MAX,
+ msecs_to_jiffies(RMR_CLT_SEND_MSG_TIMEOUT_MS));
+ *errno = sess_iu->comp.errno;
+ if (*errno == INT_MAX)
+ *errno = -ETIMEDOUT;
+ } else {
+ *errno = 0;
+ }
+ return err;
+}
+
+static int send_msg_rejoin_pool(struct rmr_clt_pool_sess *pool_sess, bool wait)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+ int ret;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_REJOIN_POOL;
+
+ msg.join_pool_cmd.rejoin = true;
+ msg.join_pool_cmd.chunk_size = pool->chunk_size;
+ msg.join_pool_cmd.queue_depth = clt_sess->queue_depth;
+
+ ret = rmr_clt_pool_send_cmd(pool_sess, &msg, wait);
+ if (ret)
+ pr_err("%s failed\n", rmr_get_cmd_name(msg.cmd_type));
+
+ return ret;
+}
+
+static int send_msg_join_pool(struct rmr_clt_pool_sess *pool_sess, bool create,
+ bool dirty, bool wait)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool_member_info *mem_info;
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_clt_pool_sess *t_pool_sess;
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+ struct rmr_dirty_id_map *map;
+ int ret, i = 0, idx;
+
+ rmr_clt_init_cmd(pool_sess->pool, &msg);
+ msg.cmd_type = RMR_CMD_JOIN_POOL;
+
+ msg.join_pool_cmd.queue_depth = clt_sess->queue_depth;
+ msg.join_pool_cmd.chunk_size = pool->chunk_size;
+ msg.join_pool_cmd.rejoin = false;
+
+ if (!msg.sync) {
+ msg.join_pool_cmd.create = create;
+ msg.join_pool_cmd.dirty = dirty;
+ mem_info = &(msg.join_pool_cmd.mem_info);
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ list_for_each_entry_srcu(t_pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (t_pool_sess->member_id == pool_sess->member_id)
+ continue;
+
+ map = rmr_pool_find_map(pool, t_pool_sess->member_id);
+ if (!map) {
+ pr_err("%s: Map with member_id %u does not exist\n",
+ __func__, t_pool_sess->member_id);
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ return -ENOENT;
+ }
+
+ mem_info->p_mem_info[i].member_id = t_pool_sess->member_id;
+ /* Only relevant for create */
+ if (create)
+ mem_info->p_mem_info[i].c_dirty = !rmr_map_empty(map);
+ i++;
+ if (WARN_ON(i >= RMR_POOL_MAX_SESS))
+ break;
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ mem_info->no_of_stor = i;
+ }
+
+ ret = rmr_clt_pool_send_cmd(pool_sess, &msg, wait);
+ if (ret)
+ pr_err("%s failed\n", rmr_get_cmd_name(msg.cmd_type));
+
+ return ret;
+}
+
+int send_msg_leave_pool(struct rmr_clt_pool_sess *pool_sess, bool delete, bool wait)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ int ret;
+
+ rmr_clt_init_cmd(pool_sess->pool, &msg);
+ msg.cmd_type = RMR_CMD_LEAVE_POOL;
+
+ msg.leave_pool_cmd.member_id = pool_sess->member_id;
+ msg.leave_pool_cmd.delete = delete;
+
+ ret = rmr_clt_pool_send_cmd(pool_sess, &msg, wait);
+ if (ret)
+ pr_err("%s failed\n", rmr_get_cmd_name(msg.cmd_type));
+
+ return ret;
+}
+
+bool rmr_clt_sess_is_sync(struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ bool ret = false;
+
+ if (!pool) {
+ WARN(1, "for sess %s pool is not assigned\n",
+ pool_sess->clt_sess->sessname);
+ return false;
+ }
+
+ if (pool->sync) {
+ pr_debug("sess %s pool %s is sync (internal) clt sess\n",
+ pool_sess->clt_sess->sessname, pool->poolname);
+ ret = true;
+ } else {
+ pr_debug("sess %s pool %s is not sync clt sess\n",
+ pool_sess->clt_sess->sessname, pool->poolname);
+ ret = false;
+ }
+ return ret;
+}
+
+/**
+ * rmr_clt_send_pool_info() - Notify all other pool members of a membership change
+ *
+ * @pool_sess: The pool session of the member whose state is changing.
+ * @op: Operation: %RMR_POOL_INFO_OP_ADD or %RMR_POOL_INFO_OP_REMOVE.
+ * @mode: For ADD: %RMR_POOL_INFO_MODE_CREATE or %RMR_POOL_INFO_MODE_ASSEMBLE.
+ * For REMOVE: %RMR_POOL_INFO_MODE_DELETE or %RMR_POOL_INFO_MODE_DISASSEMBLE.
+ * @dirty: When op is ADD and mode is CREATE, indicates that @pool_sess
+ * has outstanding dirty data that the receiving node must track.
+ *
+ * Sends a POOL_INFO command to every other non-FAILED, non-REMOVING
+ * member in the pool so they can update their view of pool membership.
+ *
+ * Return:
+ * 0 on success, negative error code on failure.
+ *
+ * Context:
+ * This function blocks while sending the command.
+ */
+static int rmr_clt_send_pool_info(struct rmr_clt_pool_sess *pool_sess,
+ enum rmr_pool_info_op op, enum rmr_pool_info_mode mode,
+ bool dirty)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_clt_pool_sess *t_pool_sess;
+ struct rmr_msg_pool_cmd msg = {};
+ int idx, ret = 0;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_POOL_INFO;
+
+ msg.pool_info_cmd.member_id = pool_sess->member_id;
+ msg.pool_info_cmd.operation = op;
+ msg.pool_info_cmd.mode = mode;
+
+ if (op == RMR_POOL_INFO_OP_ADD && mode == RMR_POOL_INFO_MODE_CREATE && dirty)
+ msg.pool_info_cmd.dirty = dirty;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ list_for_each_entry_srcu(t_pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ enum rmr_clt_pool_sess_state state;
+
+ /*
+ * No need to send the info message to the member who just joined.
+ */
+ if (t_pool_sess->member_id == pool_sess->member_id)
+ continue;
+
+ state = atomic_read(&t_pool_sess->state);
+ /*
+ * TODO: For FAILED session we have to store the missed
+ * msgs and send them later when the session recovers.
+ */
+ if (state == RMR_CLT_POOL_SESS_FAILED ||
+ state == RMR_CLT_POOL_SESS_REMOVING)
+ continue;
+
+ ret = rmr_clt_pool_send_cmd(t_pool_sess, &msg, WAIT);
+ if (ret) {
+ pr_err("%s failed with err %d\n", rmr_get_cmd_name(msg.cmd_type), ret);
+ break;
+ }
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ return ret;
+}
+
+void resend_join_pool(struct rmr_clt_sess *clt_sess)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+
+ mutex_lock(&clt_sess->lock);
+
+ list_for_each_entry(pool_sess, &clt_sess->pool_sess_list, clt_sess_entry) {
+ int err;
+
+ err = send_msg_rejoin_pool(pool_sess, WAIT);
+ if (err) {
+ pr_err("send_msg_rejoin_pool failed for sess %s error %d\n",
+ pool_sess->sessname, err);
+ }
+ }
+ mutex_unlock(&clt_sess->lock);
+
+ return;
+}
+
+int send_msg_enable_pool(struct rmr_clt_pool_sess *pool_sess, bool enable)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ int ret;
+
+ rmr_clt_init_cmd(pool_sess->pool, &msg);
+ msg.cmd_type = RMR_CMD_ENABLE_POOL;
+
+ msg.enable_pool_cmd.enable = enable;
+
+ ret = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (ret) {
+ pr_err("%s failed\n", rmr_get_cmd_name(msg.cmd_type));
+ goto err;
+ }
+
+err:
+ return ret;
+}
+
+static const char *rmr_clt_pool_sess_state_names[] = {
+ [0] = "invalid state",
+ [RMR_CLT_POOL_SESS_CREATED] = "created",
+ [RMR_CLT_POOL_SESS_NORMAL] = "normal",
+ [RMR_CLT_POOL_SESS_FAILED] = "failed",
+ [RMR_CLT_POOL_SESS_RECONNECTING] = "reconnecting",
+ [RMR_CLT_POOL_SESS_REMOVING] = "removing"
+};
+
+const char *rmr_clt_sess_state_str(enum rmr_clt_pool_sess_state state)
+{
+ return rmr_clt_pool_sess_state_names[state];
+}
+
+int rmr_clt_reconnect_sess(struct rmr_clt_sess *clt_sess,
+ const struct rtrs_addr *paths,
+ size_t path_cnt)
+{
+ struct rtrs_attrs attrs;
+ struct rtrs_clt_ops rtrs_ops;
+ int err = 0;
+
+ rtrs_ops = (struct rtrs_clt_ops){
+ .priv = clt_sess,
+ .link_ev = rmr_clt_link_ev,
+ };
+
+ clt_sess->rtrs = rtrs_clt_open(&rtrs_ops, clt_sess->sessname,
+ paths, path_cnt, RTRS_PORT,
+ 0, /* Do not use pdu of rtrs */
+ RECONNECT_DELAY,
+ MAX_RECONNECTS, 0);
+ if (IS_ERR(clt_sess->rtrs)) {
+ err = PTR_ERR(clt_sess->rtrs);
+ pr_err("rtrs_clt_open error %d\n", err);
+ goto err;
+ }
+
+ err = rtrs_clt_query(clt_sess->rtrs, &attrs);
+ if (unlikely(err)) {
+ pr_err("rtrs_clt_query error %d\n", err);
+ goto close_sess;
+ }
+ clt_sess->max_io_size = attrs.max_io_size;
+ clt_sess->queue_depth = attrs.queue_depth;
+ clt_sess->max_segments = attrs.max_segments;
+
+ clt_sess->state = RMR_CLT_SESS_CONNECTED;
+
+ resend_join_pool(clt_sess);
+
+ return err;
+
+close_sess:
+ rtrs_clt_close(clt_sess->rtrs);
+err:
+ return err;
+}
+
+//TODO: we do not use rsp in this function, do we need it as an argument?
+static int rmr_clt_handle_rejoin_rsp(struct rmr_clt_pool_sess *pool_sess, struct rmr_msg_pool_cmd_rsp *rsp)
+{
+ int err = 0;
+
+ if (rmr_clt_sess_is_sync(pool_sess)) {
+ /*
+ * The client on sync side does not need map update
+ * hence goes to "normal" state directly.
+ * NB: FAILED => NORMAL
+ */
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_NORMAL);
+ } else {
+ /*
+ * The client on non-sync side needs map update,
+ *
+ * A map update is to be triggered, which updates the map,
+ * and then sets state to "normal"
+ */
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_RECONNECTING);
+
+ /*
+ * Send the info about the pool to all the storages.
+ * Contains IDs of storages connected to this pool.
+ */
+ err = rmr_clt_send_pool_info(pool_sess, RMR_POOL_INFO_OP_ADD,
+ RMR_POOL_INFO_MODE_ASSEMBLE, false);
+ if (err) {
+ pr_err("Rejoin: rmr_clt_send_pool_info failed for session %s",
+ pool_sess->sessname);
+ return -EINVAL;
+ }
+
+ err = rmr_clt_pool_try_enable(pool_sess->pool);
+ if (err)
+ pr_err("%s: pool %s try_enable failed for sess %s: %d\n",
+ __func__, pool_sess->pool->poolname,
+ pool_sess->sessname, err);
+ }
+
+ return err;
+}
+
+static void rmr_clt_handle_join_rsp(struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_msg_pool_cmd_rsp *rsp)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_pool_md *clt_md;
+ u64 mapped_size;
+
+ clt_md = &pool->pool_md;
+
+ pool_sess->ver = min_t(u8, rsp->ver, RMR_PROTO_VER_MAJOR);
+ pool_sess->member_id = rsp->member_id;
+ xa_store(&pool->stg_members, pool_sess->member_id, pool_sess, GFP_KERNEL);
+
+ pool->chunk_size = rsp->join_pool_cmd_rsp.chunk_size;
+ pool->chunk_size_shift = ilog2(pool->chunk_size);
+ clt_md->chunk_size = pool->chunk_size;
+
+ mapped_size = rsp->join_pool_cmd_rsp.mapped_size;
+ if (mapped_size) {
+ pool->mapped_size = mapped_size;
+ pool->pool_md.mapped_size = mapped_size;
+ rmr_pool_update_no_of_chunk(pool);
+ pr_info("clt join_pool: mapped size %llu\n", pool->mapped_size);
+ }
+}
+
+static int cmd_process_rsp(struct rmr_clt_pool_sess *pool_sess, struct rmr_msg_pool_cmd_rsp *rsp)
+{
+ int err = 0;
+
+ pr_debug("rsp, cmd_type %d, member_id %d, err %d\n",
+ rsp->cmd_type, rsp->member_id, rsp->err);
+
+ if (rsp->err)
+ return rsp->err;
+
+ switch (rsp->cmd_type) {
+ case RMR_CMD_MAP_CHECK:
+ return rmr_clt_handle_map_check_rsp(pool_sess, rsp);
+ case RMR_CMD_STORE_CHECK:
+ return rmr_clt_handle_store_check_rsp(pool_sess, rsp);
+ case RMR_CMD_MAP_READY:
+ case RMR_CMD_MAP_SEND:
+ case RMR_CMD_MAP_BUF_DONE:
+ case RMR_CMD_MAP_DONE:
+ case RMR_CMD_MAP_DISABLE:
+ case RMR_CMD_LEAVE_POOL:
+ case RMR_CMD_LAST_IO_TO_MAP:
+ case RMR_CMD_MD_SEND:
+ case RMR_CMD_MAP_SET_VER:
+ case RMR_CMD_SEND_DISCARD:
+ case RMR_CMD_DISCARD_CLEAR_FLAG:
+ case RMR_CMD_POOL_INFO:
+ pr_debug("%s: No rsp handling for %s\n", __func__, rmr_get_cmd_name(rsp->cmd_type));
+ break;
+ case RMR_CMD_REJOIN_POOL:
+ return rmr_clt_handle_rejoin_rsp(pool_sess, rsp);
+ case RMR_CMD_JOIN_POOL:
+ rmr_clt_handle_join_rsp(pool_sess, rsp);
+ break;
+ case RMR_CMD_ENABLE_POOL:
+ pool_sess->ver = min_t(u8, rsp->ver, RMR_PROTO_VER_MAJOR);
+ break;
+ default:
+ pr_warn("%s: switch default type: %d\n", __func__, rsp->cmd_type);
+
+ err = -EINVAL;
+ }
+
+ return err;
+}
+
+static void msg_pool_cmd_conf(struct work_struct *work)
+{
+ struct rmr_clt_sess_iu *sess_iu = container_of(work, struct rmr_clt_sess_iu, work);
+ struct rmr_msg_pool_cmd_rsp *rsp = sess_iu->buf;
+ struct rmr_clt_pool_sess *pool_sess = sess_iu->pool_sess;
+
+ pr_debug("pool cmd for %s session %s member_id %d conf with errno %d\n",
+ pool_sess->pool->poolname, pool_sess->sessname,
+ pool_sess->member_id, sess_iu->errno);
+
+ if (!sess_iu->errno) {
+ /*
+ * We need to check if there was an error while processing the cmd
+ * on the server side. If there was, then we fail the command.
+ */
+ sess_iu->errno = cmd_process_rsp(pool_sess, rsp);
+ }
+
+ kfree(rsp);
+ wake_up_iu_comp(sess_iu);
+ rmr_msg_put_iu(pool_sess, sess_iu);
+}
+
+void rmr_clt_init_cmd(struct rmr_pool *pool, struct rmr_msg_pool_cmd *msg)
+{
+ memset(msg, 0, sizeof(*msg));
+
+ msg->hdr.group_id = cpu_to_le32(pool->group_id);
+ msg->hdr.type = cpu_to_le16(RMR_MSG_CMD);
+ msg->hdr.__padding = 0;
+ msg->ver = RMR_PROTO_VER_MAJOR;
+ msg->sync = pool->sync;
+
+ strncpy(msg->pool_name, pool->poolname, sizeof(msg->pool_name));
+}
+EXPORT_SYMBOL(rmr_clt_init_cmd);
+
+int rmr_clt_pool_send_cmd(struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_msg_pool_cmd *msg, bool wait)
+{
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+ struct rmr_msg_pool_cmd_rsp *rsp;
+ struct rmr_clt_sess_iu *sess_iu;
+ struct kvec vec = {
+ .iov_base = msg,
+ .iov_len = sizeof(*msg)
+ };
+ int err, errno;
+
+ rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
+ if (unlikely(!rsp))
+ return -ENOMEM;
+
+ sess_iu = rmr_msg_get_iu(pool_sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT, 2);
+ if (unlikely(!sess_iu)) {
+ kfree(rsp);
+ return -ENOMEM;
+ }
+
+ sess_iu->buf = rsp;
+ sg_init_one(&sess_iu->sg, rsp, sizeof(*rsp));
+
+ err = send_usr_msg(clt_sess->rtrs, READ, sess_iu,
+ &vec, 1, sizeof(*rsp), &sess_iu->sg, 1,
+ msg_pool_cmd_conf, &errno, wait);
+ if (unlikely(err)) {
+ rmr_msg_put_iu(pool_sess, sess_iu);
+ kfree(rsp);
+ } else {
+ err = errno;
+ }
+
+ rmr_msg_put_iu(pool_sess, sess_iu);
+
+ return err;
+}
+
+/*
+ * Pre-requisite: rcu read lock should be held by caller
+ */
+static struct rmr_clt_pool_sess *
+rmr_clt_get_first_normal_session(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_NORMAL)
+ return pool_sess;
+ }
+
+ return NULL;
+}
+
+/**
+ * rmr_clt_pool_send_all - Send a command to all sessions in the pool
+ *
+ * @pool: The client pool which sends the command message
+ * @msg: The command message of pool
+ *
+ * Description:
+ * When sending messages to all pool sessions, it will continue to send
+ * regardless of the failure of the previous communication.
+ *
+ * Return:
+ * 0 if at least one successful request
+ * less than 0 if all requests failed
+ */
+int rmr_clt_pool_send_all(struct rmr_pool *pool, struct rmr_msg_pool_cmd *msg)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ int idx, err = 0;
+ u8 member_id = 0;
+ int ret = 0;
+
+ if (msg->cmd_type == RMR_CMD_SEND_DISCARD)
+ member_id = msg->send_discard_cmd.member_id;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ /* The node has had discards. */
+ if (pool_sess->member_id == member_id)
+ continue;
+
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_FAILED)
+ continue;
+
+ pr_info("pool %s send cmd %d to sess %s\n",
+ pool->poolname, msg->cmd_type, pool_sess->sessname);
+
+ /* The err code reflects the response from this pool_sess. */
+ err = rmr_clt_pool_send_cmd(pool_sess, msg, WAIT);
+ if (err) {
+ pr_err("pool %s sending cmd to sess %s failed, err=%d\n",
+ pool->poolname, pool_sess->sessname, err);
+ continue;
+ }
+
+ pr_info("pool %s done sending cmd %d to sess %s\n",
+ pool->poolname, msg->cmd_type, pool_sess->sessname);
+ ret++;
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ if (ret)
+ return 0;
+
+ return -ENETUNREACH;
+}
+EXPORT_SYMBOL(rmr_clt_pool_send_all);
+
+/**
+ * rmr_clt_send_cmd_with_data_all - Send a command with data to all sessions in the pool
+ *
+ * Return:
+ * 0 on success of all sends
+ * less than 0 if all sends failed
+ * positive number of failed sends
+ */
+int rmr_clt_send_cmd_with_data_all(struct rmr_pool *pool, struct rmr_msg_pool_cmd *msg,
+ void *buf, unsigned int buflen)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ int idx, err = 0;
+ bool ret = false;
+ int errno = 0;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_FAILED) {
+ errno++;
+ continue;
+ }
+
+ pr_debug("pool %s send cmd %d to sess %s\n",
+ pool->poolname, msg->cmd_type, pool_sess->sessname);
+ err = rmr_clt_send_cmd_with_data(pool, pool_sess, msg, buf, buflen);
+ if (err) {
+ errno++;
+ pr_debug("pool %s sending cmd to sess %s failed, err=%d\n",
+ pool->poolname, pool_sess->sessname, err);
+ continue;
+ }
+
+ pr_debug("pool %s done sending cmd %d to sess %s\n",
+ pool->poolname, msg->cmd_type, pool_sess->sessname);
+ ret = true;
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ if (ret)
+ return errno;
+
+ return -EINVAL;
+}
+EXPORT_SYMBOL(rmr_clt_send_cmd_with_data_all);
+
+/**
+ * rmr_clt_start_last_io_update() - Do the last IO update
+ *
+ * @pool: The pool
+ *
+ * Description:
+ * Last IO update is needed in case a pserver went down while connected to a pool.
+ * A pserver going down while performing IOs could mean that some IOs could have been
+ * executed in some nodes but not all. This function takes the last 'queue_depth' number of
+ * IOs on each storage node and makes sure they are synced in between all the nodes.
+ * Before performing the last IO conversion, it also makes sure that all the storage nodes
+ * have the lastest map.
+ *
+ * Return:
+ * 0 on success
+ * Error value on failure
+ *
+ * Context:
+ * srcu_read_lock should be held while calling this function.
+ */
+int rmr_clt_start_last_io_update(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool_sess *pool_sess_chosen, *pool_sess;
+ struct rmr_msg_pool_cmd msg = {};
+ u64 map_ver, highest_map_ver = 0;
+ int j, err, idx, ret = 0;
+ int discard_ids[RMR_POOL_MAX_SESS];
+ u8 id, nr_discards = 0;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+
+ for (j = 0; j < RMR_POOL_MAX_SESS; j++) {
+ struct rmr_clt_pool_sess *ps;
+ u8 mid = pool->pool_md.srv_md[j].member_id;
+
+ if (!mid)
+ continue;
+
+ ps = xa_load(&pool->stg_members, mid);
+ if (!ps) {
+ pr_err("%s: member_id %u not yet assembled\n",
+ __func__, mid);
+ err = -EINVAL;
+ goto out;
+ }
+ if (atomic_read(&ps->state) != RMR_CLT_POOL_SESS_RECONNECTING) {
+ pr_err("%s: member_id %u not in reconnecting state\n",
+ __func__, mid);
+ err = -EINVAL;
+ goto out;
+ }
+ }
+
+ /*
+ * Before pserver died, it could be that one or more storage nodes were down.
+ * This would mean there is a possibility that those storage nodes will not have
+ * the latest map. But that can create problems.
+ * We need to make sure that every storage node has the latest map.
+ * Hence, find out which node has the latest map first,
+ */
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ err = send_map_get_version(pool_sess, &map_ver);
+ if (err) {
+ pr_err("%s: Failed to read map version for sess %s\n",
+ __func__, pool_sess->sessname);
+ err = -EINVAL;
+ goto out;
+ }
+
+ if (RMR_STORE_IS_REPLACE(map_ver)) {
+ map_ver = RMR_STORE_GET_VER(map_ver);
+ discard_ids[nr_discards] = pool_sess->member_id;
+ nr_discards++;
+ }
+
+ if (map_ver > highest_map_ver) {
+ highest_map_ver = map_ver;
+ pool_sess_chosen = pool_sess;
+ }
+ }
+
+ for (j = 0; j < nr_discards; j++) {
+ id = discard_ids[j];
+ pr_info("%s: Send discard req %d to S%d\n",
+ __func__, id, pool_sess_chosen->member_id);
+ err = send_discard(pool_sess_chosen, RMR_CMD_SEND_DISCARD, id);
+ if (err) {
+ pr_err("%s: Failed to send discard request to %s\n",
+ __func__, pool_sess_chosen->sessname);
+ goto out;
+ }
+ }
+
+ /*
+ * We have the storage node with the latest map,
+ * make sure the latest map is sent to all other storage nodes.
+ */
+ err = rmr_clt_spread_map(pool, pool_sess_chosen, false, false);
+ if (err) {
+ pr_err("%s: Failed to spread the latest map\n", __func__);
+ goto out;
+ }
+
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ for (j = 0; j < nr_discards; j++) {
+ id = discard_ids[j];
+ pr_info("%s: Send discard clear req %d to S%d\n",
+ __func__, id, pool_sess->member_id);
+ err = send_discard(pool_sess, RMR_CMD_DISCARD_CLEAR_FLAG, id);
+ if (err) {
+ pr_err("%s: Failed to clear discard state on %s\n",
+ __func__, pool_sess->sessname);
+ } else {
+ ret++;
+ }
+ }
+ }
+
+ if (nr_discards && !ret) {
+ pr_err("%s: Failed to clear discard state on any storage node\n", __func__);
+ err = -EINVAL;
+ goto out;
+ }
+
+ /*
+ * Now that we are done with the dispersing of the latest map,
+ * we can start last IO update.
+ */
+ rmr_clt_init_cmd(pool, &msg);
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ msg.cmd_type = RMR_CMD_LAST_IO_TO_MAP;
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (err) {
+ pr_err("%s: %s failed\n", __func__, rmr_get_cmd_name(msg.cmd_type));
+ goto out;
+ }
+
+ err = rmr_clt_spread_map(pool, pool_sess, true, false);
+ if (err) {
+ pr_err("%s: Failed to spread last_io converted map\n", __func__);
+ goto out;
+ }
+ }
+
+ err = rmr_clt_read_map(pool);
+ if (err) {
+ pr_err("%s: rmr_clt_read_map failed with err %d\n", __func__, err);
+ goto out;
+ }
+
+out:
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ return err;
+}
+
+/**
+ * rmr_clt_enable_sess() - Enable the rmr clt pool sessions
+ *
+ * @pool_sess: The rmr clt pool session to enable
+ *
+ * Description:
+ * This function takes care of enable request, for pool sessions
+ * not in maintenance mode and in mm.
+ *
+ * Return:
+ * 0 on success
+ * Error value on failure
+ */
+int rmr_clt_enable_sess(struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ int pool_sess_state, err = 0;
+
+ pr_info("%s: For session %s of pool %s\n",
+ __func__, pool_sess->sessname, pool->poolname);
+
+ if (!pool_sess->maintenance_mode) {
+ /*
+ * Simple enable, not related to maintenance.
+ * Manual enable is only allowed for sessions in "created" state
+ */
+ pool_sess_state = atomic_read(&pool_sess->state);
+ if (pool_sess_state != RMR_CLT_POOL_SESS_CREATED) {
+ pr_err("Cannot manually enable session: state %d\n", pool_sess_state);
+ err = -EINVAL;
+ goto out;
+ }
+
+ err = send_msg_enable_pool(pool_sess, 1);
+ if (err) {
+ pr_err("Failed to send enable to pool %s. Err %d\n",
+ pool->poolname, err);
+ goto out;
+ }
+
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_NORMAL);
+ } else {
+ /*
+ * Enable when in maintenance mode.
+ */
+ err = rmr_clt_unset_pool_sess_mm(pool_sess);
+ }
+
+out:
+ return err;
+}
+
+/**
+ * rmr_clt_create_sess() - allocate and initialize rmr client session, rmr_clt_pool sess can use it
+ * to submit io to the rtrs connection
+ *
+ * @sessname: Name to be given to the new session being created.
+ * @paths: RTRS paths created for the session.
+ * @path_cnt: Number of paths.
+ *
+ * Return:
+ * Pointer to rmr_clt_sess on success
+ * ERR_PTR on failure
+ *
+ * Description:
+ * Create a new session to storage node with address "rtrs_addr".
+ * After this function is done, rmr_clt_pool_sess caan use this sess to submit io
+ *
+ * Context:
+ * This function blocks while creating the session
+ */
+static struct rmr_clt_sess *rmr_clt_create_sess(const char *sessname,
+ const struct rtrs_addr *paths,
+ size_t path_cnt)
+{
+ struct rmr_clt_sess *clt_sess;
+ struct rtrs_attrs attrs;
+ struct rtrs_clt_ops rtrs_ops;
+ int err;
+
+ clt_sess = alloc_clt_sess(sessname);
+ if (IS_ERR(clt_sess)) {
+ pr_err("Session '%s' can not be allocated in pool\n", sessname);
+ return clt_sess; // TODO: isit err_cast here?
+ }
+
+ rtrs_ops = (struct rtrs_clt_ops){
+ .priv = clt_sess,
+ .link_ev = rmr_clt_link_ev,
+ };
+ /*
+ * Nothing was found, establish rtrs connection and proceed further.
+ */
+ clt_sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
+ paths, path_cnt, RTRS_PORT,
+ 0, /* Do not use pdu of rtrs */
+ RECONNECT_DELAY,
+ MAX_RECONNECTS, 0);
+ if (IS_ERR(clt_sess->rtrs)) {
+ err = PTR_ERR(clt_sess->rtrs);
+ pr_err("rtrs_clt_open error %d\n", err);
+ goto free_clt_sess;
+ }
+ err = rtrs_clt_query(clt_sess->rtrs, &attrs);
+ if (unlikely(err)) {
+ pr_err("rtrs_clt_query error %d\n", err);
+ goto close_sess;
+ }
+ clt_sess->max_io_size = attrs.max_io_size;
+ clt_sess->queue_depth = attrs.queue_depth;
+ clt_sess->max_segments = attrs.max_segments;
+ //sess->sess_kobj = &sess->rtrs->dev.dev.kobj;
+
+ err = rmr_clt_create_clt_sess_sysfs_files(clt_sess);
+ if (err) {
+ pr_err("failed to crete sysfs files for sess %s, err=%d\n",
+ clt_sess->sessname, err);
+ goto close_sess;
+ }
+ clt_sess->state = RMR_CLT_SESS_CONNECTED;
+
+ mutex_lock(&g_sess_lock);
+ list_add(&clt_sess->g_list, &g_sess_list);
+ mutex_unlock(&g_sess_lock);
+
+ return clt_sess;
+
+close_sess:
+ rtrs_clt_close(clt_sess->rtrs);
+
+free_clt_sess:
+ kfree(clt_sess);
+
+ return ERR_PTR(err);
+}
+
+/**
+ * rmr_clt_pool_try_enable() - Trigger pool session recovery if conditions are met
+ *
+ * @pool: The pool to check
+ *
+ * Scans pool sessions and fires the appropriate recovery action:
+ *
+ * Case 1: ≥1 NORMAL session exists → spread its map (with enable=true) to all
+ * non-NORMAL sessions, then set them to NORMAL on the client side
+ * Case 2: Exactly one was_last_authoritative RECONNECTING session exists →
+ * enable it directly (data is complete, no map needed), then spread
+ * its map to remaining sessions
+ * Cases 3/4: All pool_md members present and RECONNECTING → last_io_update
+ *
+ * Return: 0 on success or when conditions are not yet met, negative error on failure.
+ */
+int rmr_clt_pool_try_enable(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+ struct rmr_clt_pool_sess *pool_sess, *normal_sess, *auth_sess;
+ bool any_member = false;
+ int idx, j, err = 0;
+
+ pr_info("%s: Started for pool %s\n", __func__, pool->poolname);
+
+ /*
+ * clt_pool_lock is held across all RPC round-trips below (MAP_READY,
+ * MAP_SEND, MAP_DONE, last_io_update exchanges). This serialises
+ * concurrent try_enable calls and prevents rmr_clt_open/close from
+ * racing with recovery. The RPC send path (rmr_clt_pool_send_cmd)
+ * uses per-session permits and does not acquire clt_pool_lock, so
+ * there is no deadlock. rmr_clt_open and rmr_clt_close use
+ * mutex_trylock and mutex_lock respectively to handle this.
+ */
+ mutex_lock(&clt_pool->clt_pool_lock);
+
+ normal_sess = NULL;
+ auth_sess = NULL;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ int state = atomic_read(&pool_sess->state);
+
+ if (state == RMR_CLT_POOL_SESS_NORMAL) {
+ if (!normal_sess)
+ normal_sess = pool_sess;
+ } else if (state == RMR_CLT_POOL_SESS_RECONNECTING &&
+ pool_sess->was_last_authoritative &&
+ !pool_sess->maintenance_mode &&
+ !auth_sess) {
+ auth_sess = pool_sess;
+ }
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ /*
+ * Invariant: at most one was_last_authoritative session can exist
+ * (guaranteed by atomic_dec_and_test in pool_sess_change_state), and
+ * it cannot coexist with a NORMAL session (if a NORMAL session exists,
+ * the pool never fully went to FAILED, so no session gets the flag).
+ */
+ if (WARN_ON(auth_sess && normal_sess)) {
+ err = -EINVAL;
+ goto out;
+ }
+
+ /* Case 2: was_last_authoritative session — enable it directly, then spread */
+ if (auth_sess) {
+ err = send_msg_enable_pool(auth_sess, 1);
+ if (err) {
+ pr_err("%s: pool %s failed to enable auth sess %s: %d\n",
+ __func__, pool->poolname, auth_sess->sessname, err);
+ goto out;
+ }
+ pool_sess_change_state(auth_sess, RMR_CLT_POOL_SESS_NORMAL);
+ normal_sess = auth_sess;
+ }
+
+ /* Case 1: ≥1 NORMAL session → spread map to all non-NORMAL sessions */
+ if (normal_sess) {
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ err = rmr_clt_spread_map(pool, normal_sess, true, true);
+ if (err)
+ pr_err("%s: pool %s spread map from %s failed: %d\n",
+ __func__, pool->poolname, normal_sess->sessname, err);
+ else
+ goto out_normal;
+
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ goto out;
+ }
+
+ /* Cases 3/4: all pool_md members present and RECONNECTING */
+ for (j = 0; j < RMR_POOL_MAX_SESS; j++) {
+ struct rmr_clt_pool_sess *ps;
+ u8 mid = pool->pool_md.srv_md[j].member_id;
+
+ if (!mid)
+ continue;
+
+ any_member = true;
+ ps = xa_load(&pool->stg_members, mid);
+ if (!ps || atomic_read(&ps->state) != RMR_CLT_POOL_SESS_RECONNECTING ||
+ ps->maintenance_mode) {
+ pr_info("%s: pool %s member_id %u not yet in reconnecting/mm, waiting\n",
+ __func__, pool->poolname, mid);
+ goto out;
+ }
+ }
+
+ if (!any_member) {
+ pr_info("%s: pool %s has no members in pool_md, nothing to do\n",
+ __func__, pool->poolname);
+ goto out;
+ }
+
+ pr_info("%s: pool %s all members reconnecting, starting last_io_update\n",
+ __func__, pool->poolname);
+
+ err = rmr_clt_start_last_io_update(pool);
+ if (err) {
+ pr_err("%s: pool %s last_io_update failed: %d\n",
+ __func__, pool->poolname, err);
+ goto out;
+ }
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+out_normal:
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (atomic_read(&pool_sess->state) != RMR_CLT_POOL_SESS_RECONNECTING ||
+ pool_sess->maintenance_mode)
+ continue;
+
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_NORMAL);
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+out:
+ mutex_unlock(&clt_pool->clt_pool_lock);
+ return err;
+}
+
+/**
+ * rmr_clt_read_pool_md() - Read the full pool_md from a storage server's disk
+ *
+ * @pool_sess: The pool session to read from.
+ *
+ * Sends RMR_CMD_MD_SEND with read_full_md=1 to the given session and imports
+ * the returned srv_md[] entries into pool->pool_md, skipping already-known
+ * members. Used during add_sess mode=assemble so the client learns all pool
+ * member IDs from the server's on-disk metadata, not only the one being
+ * assembled.
+ *
+ * Return:
+ * 0 on success, negative error code on failure.
+ */
+static int rmr_clt_read_pool_md(struct rmr_clt_pool_sess *pool_sess, bool first)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool_md *remote_md;
+ int i, err;
+
+ remote_md = kzalloc(sizeof(*remote_md), GFP_KERNEL);
+ if (!remote_md)
+ return -ENOMEM;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_MD_SEND;
+ msg.md_send_cmd.src_mapped_size = pool->mapped_size;
+ msg.md_send_cmd.sender_id = pool_sess->member_id;
+ msg.md_send_cmd.read_full_md = 1;
+
+ err = rmr_clt_send_cmd_with_data(pool, pool_sess, &msg,
+ remote_md, sizeof(*remote_md));
+ if (err) {
+ pr_err("%s: failed to read pool_md from sess %s: %d\n",
+ __func__, pool_sess->sessname, err);
+ goto out;
+ }
+
+ for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+ u8 mid = remote_md->srv_md[i].member_id;
+ int idx;
+
+ if (!mid)
+ continue;
+
+ idx = rmr_pool_find_md(&pool->pool_md, mid, first);
+ if (idx < 0)
+ continue;
+
+ if (!pool->pool_md.srv_md[idx].member_id) {
+ /* New entry — import blindly */
+ memcpy(&pool->pool_md.srv_md[idx], &remote_md->srv_md[i],
+ sizeof(struct rmr_srv_md));
+ } else {
+ /* Already known — verify stable fields are consistent */
+ if (pool->pool_md.srv_md[idx].mapped_size !=
+ remote_md->srv_md[i].mapped_size)
+ pr_warn("%s: member_id %u mapped_size mismatch: "
+ "expected %llu, got %llu from sess %s\n",
+ __func__, mid,
+ pool->pool_md.srv_md[idx].mapped_size,
+ remote_md->srv_md[i].mapped_size,
+ pool_sess->sessname);
+ }
+ }
+
+out:
+ kfree(remote_md);
+ return err;
+}
+
+/**
+ * rmr_clt_process_non_sync_sess() - Set up map and notify peers for a new non-sync session
+ *
+ * @pool_sess: The newly added pool session.
+ * @create: True if this is a fresh pool creation; false for an assemble of an
+ * existing pool.
+ * @dirty: True if there are already other sessions in the pool; the new member's
+ * map will be marked fully dirty to trigger a resync.
+ *
+ * Creates the dirty map for @pool_sess and informs all existing pool members
+ * about the new storage node joining. On failure the map is removed.
+ *
+ * Return:
+ * 0 on success, negative error code on failure.
+ */
+static int rmr_clt_process_non_sync_sess(struct rmr_clt_pool_sess *pool_sess, bool create,
+ bool dirty)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_dirty_id_map *map;
+ enum rmr_pool_info_mode mode;
+ u8 created_mids[RMR_POOL_MAX_SESS];
+ int created_cnt = 0;
+ int i, err = 0;
+
+ /*
+ * The mapped size of the pool is set after a backend device is mapped to the
+ * client. If a new client pool session is extended to this pool, the map for that
+ * new server node needs to be created for the client pool as well.
+ */
+ if (!pool->mapped_size) {
+ pr_err("%s: pool %s mapped_size is 0\n",
+ __func__, pool->poolname);
+ err = -EINVAL;
+ goto out;
+ }
+
+ pr_info("Through add_sess, pool %s mapped_size %llu\n",
+ pool->poolname, pool->mapped_size);
+
+ rmr_pool_update_no_of_chunk(pool);
+
+ if (create) {
+ if (rmr_pool_find_map(pool, pool_sess->member_id)) {
+ pr_err("%s: pool %s map for member_id %u already exists\n",
+ __func__, pool->poolname, pool_sess->member_id);
+ err = -EEXIST;
+ goto out;
+ }
+
+ map = rmr_map_create(pool, pool_sess->member_id);
+ if (IS_ERR(map)) {
+ err = PTR_ERR(map);
+ pr_err("%s: pool %s failed to create map for member_id %u\n",
+ __func__, pool->poolname, pool_sess->member_id);
+ goto out;
+ }
+
+ /*
+ * During pool creation, all storage nodes must start with identical
+ * data. The first node added is taken as the clean reference; any
+ * subsequent node joining must be fully synced from it.
+ * Mark the entire map dirty to trigger that initial resync.
+ */
+ if (dirty)
+ rmr_map_set_dirty_all(map, MAP_NO_FILTER);
+
+ mode = RMR_POOL_INFO_MODE_CREATE;
+ } else {
+ /*
+ * For assemble, read pool_md first so we know all member IDs,
+ * then create maps for every member in the pool.
+ */
+ mode = RMR_POOL_INFO_MODE_ASSEMBLE;
+
+ err = rmr_clt_read_pool_md(pool_sess, !dirty);
+ if (err) {
+ pr_err("%s: failed to read pool_md from sess %s: %d\n",
+ __func__, pool_sess->sessname, err);
+ goto out;
+ }
+
+ if (!dirty) {
+ for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+ u8 mid = pool->pool_md.srv_md[i].member_id;
+
+ if (!mid)
+ continue;
+
+ map = rmr_map_create(pool, mid);
+ if (IS_ERR(map)) {
+ err = PTR_ERR(map);
+ pr_err("%s: pool %s failed to create map for member_id %u\n",
+ __func__, pool->poolname, mid);
+ goto del_maps;
+ }
+ created_mids[created_cnt++] = mid;
+ }
+ }
+ }
+
+ /*
+ * We need to send the info about this node joining to other storage nodes.
+ */
+ err = rmr_clt_send_pool_info(pool_sess, RMR_POOL_INFO_OP_ADD, mode, dirty);
+ if (err) {
+ pr_err("rmr_clt_send_pool_info failed for session %s\n",
+ pool_sess->sessname);
+ if (create)
+ rmr_pool_remove_map(pool, pool_sess->member_id);
+ else
+ goto del_maps;
+ goto out;
+ }
+
+ if (!create) {
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_RECONNECTING);
+ err = rmr_clt_pool_try_enable(pool);
+ if (err)
+ pr_err("%s: pool %s try_enable failed for sess %s: %d\n",
+ __func__, pool->poolname, pool_sess->sessname, err);
+ }
+
+ return err;
+
+del_maps:
+ for (i = 0; i < created_cnt; i++)
+ rmr_pool_remove_map(pool, created_mids[i]);
+out:
+ return err;
+}
+
+/**
+ * rmr_clt_add_pool_sess() - Add a client session to an RMR pool
+ *
+ * @pool: The pool to join.
+ * @clt_sess: The client transport session to associate.
+ * @create: True if this is a fresh pool creation; false for an assemble of an
+ * existing pool.
+ *
+ * Sends a join_pool command to the server, allocates a pool session, creates
+ * the dirty map for this storage node (for non-sync pools), and notifies the
+ * other pool members via a pool_info message.
+ *
+ * Return:
+ * Pointer to the new pool session on success, ERR_PTR on failure.
+ */
+struct rmr_clt_pool_sess *rmr_clt_add_pool_sess(struct rmr_pool *pool,
+ struct rmr_clt_sess *clt_sess, bool create)
+{
+ struct rmr_clt_pool *clt_pool;
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_pool_md *clt_md;
+ int err, idx;
+ bool dirty = false;
+
+ mutex_lock(&pool->sess_lock);
+
+ if (__find_sess_by_name(pool, clt_sess->sessname)) {
+ pr_err("Session '%s' already exists in pool %s\n",
+ clt_sess->sessname, pool->poolname);
+ err = -EEXIST;
+ goto err_out;
+ }
+
+ pool_sess = alloc_pool_sess(pool, clt_sess);
+ if (IS_ERR(pool_sess)) {
+ pr_err("pool session '%s' can not be allocated in pool %s\n",
+ clt_sess->sessname, pool->poolname);
+ err = PTR_ERR(pool_sess);
+ goto err_out;
+ }
+
+ clt_pool = (struct rmr_clt_pool *)pool->priv;
+
+ /* TODO handle case where tags are alreaydy initialized */
+ clt_pool->queue_depth = clt_sess->queue_depth;
+ clt_md = &clt_pool->pool->pool_md;
+ clt_md->queue_depth = clt_sess->queue_depth;
+
+ if (!pool->sync)
+ dirty = !list_empty(&pool->sess_list);
+
+ err = send_msg_join_pool(pool_sess, create, dirty, WAIT);
+ if (unlikely(err)) {
+ pr_err("send_msg_join_pool error %d\n", err);
+ goto free_sess;
+ }
+
+ /*
+ * Now that we have the member_id of the new storage node,
+ * check if it is unique.
+ */
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ if (__find_sess_by_member_id(pool, pool_sess->member_id)) {
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ pr_err("%s: Session with member_id %u already exists\n",
+ __func__, pool_sess->member_id);
+ err = -EEXIST;
+ goto err_leave_pool;
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ list_add_tail_rcu(&pool_sess->entry, &pool->sess_list);
+
+ if (!pool->sync) {
+ err = rmr_clt_process_non_sync_sess(pool_sess, create, dirty);
+ if (err) {
+ pr_err("%s: rmr_clt_process_non_sync_sess failed for sess %s with err %d\n",
+ __func__, clt_sess->sessname, err);
+ goto rem_from_list;
+ }
+ } else
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_NORMAL);
+
+ mutex_unlock(&pool->sess_lock);
+
+ mutex_lock(&clt_sess->lock);
+ list_add_tail(&pool_sess->clt_sess_entry, &clt_sess->pool_sess_list);
+ mutex_unlock(&clt_sess->lock);
+
+ return pool_sess;
+
+rem_from_list:
+ rmr_clt_del_pool_sess(pool_sess);
+err_leave_pool:
+ send_msg_leave_pool(pool_sess, create, WAIT);
+free_sess:
+ rmr_clt_free_pool_sess(pool_sess);
+err_out:
+ mutex_unlock(&pool->sess_lock);
+ return ERR_PTR(err);
+}
+
+//reauire g_sess_lock acquired
+static struct rmr_clt_sess *__find_and_get_clt_sess(const char *sessname)
+{
+ struct rmr_clt_sess *sess, *sn;
+
+again:
+ list_for_each_entry_safe (sess, sn, &g_sess_list, g_list) {
+ if (strcmp(sessname, sess->sessname))
+ continue;
+
+ if (rmr_clt_sess_get(sess))
+ return sess;
+
+ pr_info("failed to get ref for sess %s\n", sessname);
+ goto again; //don't like it
+ }
+
+ return NULL;
+}
+
+struct rmr_clt_sess *find_and_get_or_create_clt_sess(char *sessname,
+ struct rtrs_addr *paths,
+ size_t path_cnt)
+{
+ struct rmr_clt_sess *sess;
+
+ mutex_lock(&g_sess_lock);
+ sess = __find_and_get_clt_sess(sessname);
+ mutex_unlock(&g_sess_lock);
+
+ if (!sess) {
+ pr_info("%s: Cannot find rmr_clt_sess with name %s\n", __func__, sessname);
+ sess = rmr_clt_create_sess(sessname, paths, path_cnt);
+ if (IS_ERR(sess)) {
+ return sess;
+ }
+ pr_info("%s: rmr_clt_sess %s created\n", __func__, sessname);
+ }
+
+ return sess;
+}
+
+/**
+ * rmr_clt_del_pool_sess() - Remove a session from the pool session list.
+ * @pool_sess: Pool session to remove.
+ *
+ * Removes @pool_sess from the pool's session list, waits for any in-progress
+ * SRCU readers to finish, and clears any per-CPU cached references to it.
+ *
+ * Context: Caller must hold pool->sess_lock.
+ */
+void rmr_clt_del_pool_sess(struct rmr_clt_pool_sess *pool_sess)
+{
+ int cpu;
+ bool dosync = false;
+ struct rmr_clt_pool_sess __rcu **ppcpu_sess;
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+
+ list_del_rcu(&pool_sess->entry);
+ synchronize_srcu(&pool->sess_list_srcu);
+
+ for_each_possible_cpu(cpu) {
+ preempt_disable();
+ ppcpu_sess = per_cpu_ptr(clt_pool->pcpu_sess, cpu);
+ if (pool_sess == rcu_access_pointer(*ppcpu_sess)) {
+ rcu_assign_pointer(*ppcpu_sess, NULL);
+ dosync = true;
+ }
+ preempt_enable();
+ }
+
+ if (dosync)
+ synchronize_srcu(&pool->sess_list_srcu);
+}
+
+/**
+ * rmr_clt_destroy_pool_sess() - Send leave_pool and free a pool session
+ *
+ * @pool_sess: Pool session to destroy.
+ * @delete: True for a permanent pool deletion; false for a temporary
+ * disassembly. This flag is forwarded in the leave_pool message
+ * so the server can act accordingly.
+ */
+void rmr_clt_destroy_pool_sess(struct rmr_clt_pool_sess *pool_sess, bool delete)
+{
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+
+ send_msg_leave_pool(pool_sess, delete, WAIT);
+ rmr_clt_free_pool_sess(pool_sess);
+ rmr_clt_sess_put(clt_sess);
+}
+
+static void rmr_clt_destroy_pool(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+ struct rmr_clt_pool_sess *pool_sess, *tmp;
+
+ destroy_clt_pool(pool);
+
+ list_for_each_entry_safe (pool_sess, tmp, &pool->sess_list, entry) {
+ mutex_lock(&pool->sess_lock);
+ list_del_rcu(&pool_sess->entry);
+ mutex_unlock(&pool->sess_lock);
+
+ rmr_clt_destroy_pool_sess(pool_sess, false /* never delete */);
+ }
+
+ rmr_put_clt_pool(clt_pool);
+}
+
+int rmr_clt_remove_pool_from_sysfs(struct rmr_pool *pool,
+ const struct attribute *sysfs_self)
+{
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+
+ if (!pool->sync)
+ cancel_delayed_work_sync(&clt_pool->recover_dwork);
+
+ rmr_clt_destroy_pool_sysfs_files(pool, sysfs_self);
+ rmr_clt_destroy_pool(pool);
+ return 0;
+}
+
+/*
+ * Pre-requisite: rcu read lock should be held by caller
+ */
+static struct rmr_clt_pool_sess *
+rmr_clt_next_sess(struct rmr_pool *pool, struct rmr_clt_pool_sess *prev)
+{
+ struct rmr_clt_pool_sess *next;
+
+ next = list_next_or_null_rcu(&pool->sess_list,
+ &prev->entry,
+ struct rmr_clt_pool_sess,
+ entry);
+ if (next)
+ return next;
+
+ return list_first_or_null_rcu(&pool->sess_list,
+ struct rmr_clt_pool_sess,
+ entry);
+}
+
+static inline bool rmr_clt_pool_sess_in_iu(struct rmr_iu *iu,
+ struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_clt_sess_iu *sess_iu, *tmp_sess_iu;
+
+ list_for_each_entry_safe(sess_iu, tmp_sess_iu,
+ &(iu->sess_list), entry) {
+
+ if (sess_iu->pool_sess == pool_sess)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Pre-requisite: rcu read lock should be held by caller
+ */
+static struct rmr_clt_pool_sess *rmr_clt_round_robin_sess(struct rmr_pool *pool,
+ struct rmr_iu *iu)
+{
+ struct rmr_clt_pool_sess *old, *next, *pool_sess;
+ struct rmr_clt_pool *clt_pool;
+ struct rmr_clt_pool_sess __rcu **ppcpu_sess;
+
+ clt_pool = (struct rmr_clt_pool *)pool->priv;
+ ppcpu_sess = this_cpu_ptr(clt_pool->pcpu_sess);
+
+ if (iu) {
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (rmr_clt_pool_sess_in_iu(iu, pool_sess))
+ continue;
+
+ rcu_assign_pointer(*ppcpu_sess, pool_sess);
+ return pool_sess;
+ }
+
+ return NULL;
+ }
+
+ old = rcu_dereference(*ppcpu_sess);
+ if (!old) {
+ next = rmr_clt_get_first_normal_session(pool);
+ if (!next)
+ return NULL;
+ rcu_assign_pointer(*ppcpu_sess, next);
+ return next;
+ }
+
+ for (next = rmr_clt_next_sess(pool, old);
+ next && next != old;
+ next = rmr_clt_next_sess(pool, next)) {
+ /*
+ * It could happen that the state of pool_sess hasn't been able to
+ * represent the recent rtrs-clt sess state.
+ */
+ if (next->clt_sess->state == RMR_CLT_SESS_DISCONNECTED)
+ continue;
+
+ if (atomic_read(&next->state) == RMR_CLT_POOL_SESS_NORMAL) {
+ rcu_assign_pointer(*ppcpu_sess, next);
+ return next;
+ }
+ }
+
+ /*
+ * There may be just one session with normal state i.e. old.
+ * In this case per-cpu sess pointer does not need update.
+ */
+ return rmr_clt_get_first_normal_session(pool);
+}
+
+int rmr_clt_query(struct rmr_pool *pool, struct rmr_attrs *attr)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+ int idx;
+
+ if (unlikely(!clt_pool))
+ return -EINVAL;
+
+ attr->chunk_size = pool->chunk_size;
+ attr->sync = pool->sync;
+
+ attr->queue_depth = U32_MAX;
+ attr->max_io_size = U32_MAX;
+ attr->max_segments = U32_MAX;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+
+ if (list_empty(&pool->sess_list)) {
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ return -ENOENT;
+ }
+
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+
+ attr->queue_depth = min_t(int, clt_sess->queue_depth, attr->queue_depth);
+ attr->max_io_size = min_t(u32, clt_sess->max_io_size, attr->max_io_size);
+ attr->max_segments = min_t(u32, clt_sess->max_segments, attr->max_segments);
+ }
+ attr->pool_kobj = &(pool->kobj);
+
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ return 0;
+}
+EXPORT_SYMBOL(rmr_clt_query);
+
+struct rmr_iu *rmr_clt_get_iu(struct rmr_pool *pool, enum rmr_io_flags flag,
+ enum rmr_wait_type wait)
+{
+ int err = 0, idx;
+ struct rmr_clt_pool *clt_pool;
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_iu *iu;
+ struct rmr_clt_sess_iu *sess_iu, *tmp_sess_iu;
+ bool reset = false;
+
+ clt_pool = (struct rmr_clt_pool *)pool->priv;
+
+ if (!test_bit(RMR_CLT_POOL_STATE_IN_USE, &clt_pool->state)) {
+ pr_err("%s: Pool %s not in use state\n", __func__, pool->poolname);
+ rmr_clt_dump_state(clt_pool);
+ return NULL;
+ }
+
+ /*
+ * We get the inflight ref first.
+ * If we see that an IO freeze is in progress, we put the ref, and wait for it to unfreeze
+ *
+ * The while loop protects us from parallel freeze, like
+ * A leg deletion, and right after that a call to rmr_clt_close.
+ *
+ * We are guranteed to not go on an infinite loop, since rmr_clt_close can be called only
+ * once, And, there are limited legs to delete
+ */
+ percpu_ref_get(&pool->ids_inflight_ref);
+ while (atomic_read(&clt_pool->io_freeze) > 0) {
+ percpu_ref_put(&pool->ids_inflight_ref);
+ /*
+ * Coincidentally, the rcu lock might be held when the wait event occurs,
+ * violating the constraint that no sleeping during general rcu critical section.
+ * Temporarily release the rcu lock, and re-acquire it after waking up.
+ *
+ * TODO: This approach is simple but may need to be revisited.
+ */
+ if (rcu_read_lock_held()) {
+ rcu_read_unlock();
+ reset = true;
+ }
+
+ wait_event(clt_pool->map_update_wq, !atomic_read(&clt_pool->io_freeze));
+
+ if (reset)
+ rcu_read_lock();
+
+ /*
+ * Once IO is unfrozen, we check if the state of the pool has changed.
+ * It could be that rmr_clt_close was called, and hence state is not IN_USE.
+ * Or, it could be that the last leg was deleted, and we are not in JOINED state
+ *
+ * In both the case, we cannot service IOs, hence fail.
+ */
+ if (!test_bit(RMR_CLT_POOL_STATE_IN_USE, &clt_pool->state) ||
+ !test_bit(RMR_CLT_POOL_STATE_JOINED, &clt_pool->state)) {
+ pr_err("%s: Failed to get inflight IO ref.\n", __func__);
+ pr_err("%s: Pool %s is not joined or used\n",
+ __func__, pool->poolname);
+ rmr_clt_dump_state(clt_pool);
+ return NULL;
+ }
+
+ percpu_ref_get(&pool->ids_inflight_ref);
+ }
+
+ iu = rmr_alloc_iu();
+ if (unlikely(!iu)) {
+ percpu_ref_put(&pool->ids_inflight_ref);
+ return NULL;
+ }
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ if (rmr_op(flag) == RMR_OP_READ) {
+ /*
+ * Round robin use of one of the sessions in normal state for READ.
+ *
+ * This call is always from rmr_clt_request, so for READ,
+ * this is the first pool_sess we are trying
+ */
+ pool_sess = rmr_clt_round_robin_sess(pool, NULL);
+ if (unlikely(!pool_sess)) {
+ err = -ENODEV;
+ goto put_iu;
+ }
+
+ sess_iu = rmr_get_sess_iu(pool_sess, RTRS_IO_CON, (enum wait_type) wait);
+ if (unlikely(!sess_iu))
+ goto put_iu;
+
+ sess_iu->rmr_iu = iu;
+ iu->num_sessions = 1;
+ list_add_tail(&(sess_iu->entry), (&iu->sess_list));
+ } else {
+ /*
+ * For WRITE operations we need to submit to all sessions.
+ */
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ /* Sessions must be in normal state for I/O */
+ if (atomic_read(&pool_sess->state) != RMR_CLT_POOL_SESS_NORMAL)
+ continue;
+
+ sess_iu = rmr_get_sess_iu(pool_sess,
+ RTRS_IO_CON, (enum wait_type) wait);
+ if (unlikely(!sess_iu))
+ goto put_sessions;
+
+ sess_iu->rmr_iu = iu;
+ /*
+ * The mem_id of sess_iu tracks the next free slot in the permit bitmap
+ * of an RTRS-clt session, which is used to store write IO chunk info by
+ * RMR-server.
+ */
+ sess_iu->mem_id = sess_iu->permit->mem_id;
+ iu->num_sessions++;
+ list_add_tail(&(sess_iu->entry), (&iu->sess_list));
+ }
+ }
+
+ refcount_set(&iu->refcount, iu->num_sessions);
+ iu->errno = 0;
+
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ return iu;
+
+put_sessions:
+ list_for_each_entry_safe(sess_iu, tmp_sess_iu,
+ &(iu->sess_list), entry) {
+ if (!list_empty(&sess_iu->entry))
+ list_del_init(&sess_iu->entry);
+ rmr_put_sess_iu(sess_iu->pool_sess, sess_iu);
+ }
+put_iu:
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ rmr_put_iu(iu);
+ percpu_ref_put(&pool->ids_inflight_ref);
+
+ if (err)
+ return ERR_PTR(err);
+
+ return NULL;
+}
+EXPORT_SYMBOL(rmr_clt_get_iu);
+
+void rmr_clt_put_iu(struct rmr_pool *pool, struct rmr_iu *iu)
+{
+ rmr_put_iu(iu);
+ percpu_ref_put(&pool->ids_inflight_ref);
+}
+EXPORT_SYMBOL(rmr_clt_put_iu);
+
+/**
+ * Returns 1 if the errno represents a condition in the
+ * storage server that prevents the operation to be executed.
+ * The oposite is an error with respect to the storage server
+ * where the operation can be re-tried on a different one.
+ *
+ * Example is attemp to read a block that does not exists
+ * versus server has been crashed.
+ *
+ * Note that in doubt we have to trigger the re-try.
+ */
+/*
+static inline int rmr_is_op_error(int errno)
+{
+ switch (-errno) {
+ case ENOENT:
+ case EINVAL:
+ case EEXIST:
+ case ENODEV:
+ return 1;
+ default:
+ return 0;
+ }
+}
+*/
+
+static void msg_read_conf(void *priv, int errno)
+{
+ struct rmr_clt_sess_iu *sess_iu = (struct rmr_clt_sess_iu *)priv;
+ struct rmr_clt_pool_sess *pool_sess = sess_iu->pool_sess;
+ struct rmr_iu *iu = sess_iu->rmr_iu;
+ rmr_conf_fn *clt_conf = iu->conf;
+
+ WARN_ON(atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_CREATED);
+
+ if (errno) {
+ if (!iu->errno)
+ /* only first error is reported */
+ iu->errno = errno;
+
+ pr_err_ratelimited("%s got errno: %d for session %d. Schedule retry.\n",
+ __func__, errno, pool_sess->member_id);
+ if (!pool_sess->pool->sync)
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_FAILED);
+
+ INIT_WORK(&iu->work, retry_failed_read);
+ schedule_work(&iu->work);
+ } else {
+ (*clt_conf)(iu->priv, errno);
+ }
+}
+
+static void retry_failed_read(struct work_struct *work)
+{
+ struct rmr_iu *iu = container_of(work, struct rmr_iu, work);
+ struct rmr_pool *pool = iu->pool;
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+ rmr_conf_fn *clt_conf = iu->conf;
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_clt_sess_iu *sess_iu;
+ struct rtrs_clt_req_ops req_ops;
+ struct kvec vec;
+ int err, idx;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+
+ pool_sess = rmr_clt_round_robin_sess(pool, iu);
+ if (!pool_sess)
+ goto give_up;
+
+ sess_iu = rmr_get_sess_iu(pool_sess, RTRS_IO_CON, RTRS_PERMIT_WAIT);
+ if (unlikely(!sess_iu))
+ goto give_up;
+
+ pr_debug("%s: Pool %s to session %d, chunk [%llu, %llu]\n",
+ __func__, pool->poolname, pool_sess->member_id,
+ le64_to_cpu(iu->msg.id_a), le64_to_cpu(iu->msg.id_b));
+
+ sess_iu->rmr_iu = iu;
+ iu->msg.member_id = pool_sess->member_id;
+ atomic_inc(&clt_pool->stats.read_retries);
+
+ list_add_tail(&(sess_iu->entry), (&iu->sess_list));
+
+ vec = (struct kvec) {
+ .iov_base = &iu->msg,
+ .iov_len = sizeof(iu->msg)
+ };
+
+ req_ops = (struct rtrs_clt_req_ops) {
+ .priv = sess_iu,
+ .conf_fn = msg_read_conf,
+ };
+
+ trace_retry_failed_read(READ, sess_iu);
+
+ err = rtrs_clt_request(RMR_OP_READ, &req_ops, pool_sess->clt_sess->rtrs, sess_iu->permit,
+ &vec, 1, le32_to_cpu(iu->msg.length), iu->sg, iu->sg_cnt);
+
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ if (err)
+ /* beware! recursion!! */
+ msg_read_conf(sess_iu, err);
+
+ return;
+give_up:
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ /* recursion termination! */
+ (*clt_conf)(iu->priv, iu->errno);
+}
+
+/*
+static int rmr_clt_map_remove_id(struct rmr_pool *pool, int srv_id, rmr_id_t id)
+{
+ struct rmr_dirty_id_map *map;
+
+ pr_debug("pool %s, remove id (%llu, %llu) for stg_id %d\n",
+ pool->poolname, id.a, id.b, srv_id);
+
+ map = rmr_pool_find_map(pool, srv_id);
+ if (!map) {
+ pr_err("pool %s no map found for pool_id %u\n",
+ pool->poolname, srv_id);
+ return -EINVAL;
+ //TODO: handle this , probably initialize map, or just throw err?
+ }
+
+ if (!rmr_map_empty(map)) {
+ void *val;
+
+ val = rmr_map_find(map, id);
+ if (!val) {
+ pr_debug("pool %s value for id (%llu, %llu) is not in the dirty map\n",
+ pool->poolname, id.a, id.b);
+ return 0;
+ }
+ rmr_map_erase(map, id);
+ pr_debug("pool %s, id (%llu, %llu) is removed from map for stg_id %d\n",
+ pool->poolname, id.a, id.b, srv_id);
+ }
+
+ return 0;
+}
+*/
+
+static void msg_io_conf(void *priv, int errno)
+{
+ struct rmr_clt_sess_iu *sess_iu = (struct rmr_clt_sess_iu *)priv;
+ struct rmr_clt_pool_sess *pool_sess = sess_iu->pool_sess;
+ struct rmr_iu *iu = sess_iu->rmr_iu;
+ rmr_conf_fn *clt_conf = iu->conf;
+ void *clt_priv = iu->priv;
+
+ WARN_ON(atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_CREATED);
+ WARN_ON(pool_sess->pool->sync);
+
+ if (errno) {
+ pr_err("%s: For sess %s, id (%llu, %llu), got errno: %d\n",
+ __func__, pool_sess->sessname, iu->msg.id_a, iu->msg.id_b, errno);
+ sess_iu->errno = errno;
+ if (!iu->errno)
+ /* only first error is reported */
+ iu->errno = errno;
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_FAILED);
+ pr_debug("iu->errno %d, errno %d, before dec refcnt %d\n",
+ iu->errno, errno, refcount_read(&iu->refcount));
+ } else {
+ atomic_inc(&iu->succeeded);
+ // TODO: is it ok to clear it here?
+ // rmr_clt_map_remove_id(session->pool, session->pool_id, iu->id);
+ }
+
+ pr_debug("called for id (%llu, %llu), errno %d, sessname %s\n",
+ iu->msg.id_a, iu->msg.id_b, errno, pool_sess->sessname);
+
+ if (refcount_dec_and_test(&iu->refcount)) {
+ if (atomic_read(&iu->succeeded) == 0) {
+ /*
+ * None of the IOs succeeded.
+ * Map add is not needed; Just fail the IO.
+ */
+ pr_err("Write IO failed. Passing it up. errno %d\n", iu->errno);
+ (*clt_conf)(clt_priv, iu->errno);
+ } else if (iu->errno) {
+ /*
+ * Some IOs failed. Send map update (add).
+ * The clt conf will be called when map update is done.
+ *
+ * We are using the same iu to send map update
+ * So reset the refcount.
+ */
+ refcount_set(&iu->refcount, iu->num_sessions);
+
+ /*
+ * we are in interrupt here, so sched map update
+ */
+ pr_debug("%s: some IOs failed for %s. Starts map_add\n", __func__,
+ pool_sess->sessname);
+ INIT_WORK(&iu->work, sched_map_add);
+ schedule_work(&iu->work);
+ } else {
+ /*
+ * All good.
+ */
+ errno = 0;
+ (*clt_conf)(clt_priv, errno);
+ }
+ }
+}
+
+static inline void rmr_clt_put_cu(struct rmr_clt_cmd_unit *cmd_unit)
+{
+ percpu_ref_put(&cmd_unit->clt_pool->pool->ids_inflight_ref);
+ kfree(cmd_unit);
+}
+
+/**
+ * msg_cmd_conf() - Confirmation function called for command user commands sent
+ *
+ * priv: Pointer to private data passed to rtrs. sess_iu in this case.
+ * errno: error status passed by rtrs
+ */
+static void msg_cmd_conf(void *priv, int errno)
+{
+ struct rmr_clt_sess_iu *sess_iu = (struct rmr_clt_sess_iu *)priv;
+ struct rmr_clt_cmd_unit *cmd_unit = sess_iu->rmr_cmd_unit;
+ rmr_conf_fn *clt_conf = cmd_unit->conf;
+ void *clt_priv = cmd_unit->priv;
+ int total_failed;
+
+ pr_debug("%s: sessname:%s, errno=%d\n", __func__, sess_iu->pool_sess->sessname, errno);
+ if (!errno)
+ atomic_inc(&cmd_unit->succeeded);
+
+ if (refcount_dec_and_test(&cmd_unit->refcount)) {
+ if (atomic_read(&cmd_unit->succeeded) == 0) {
+ /*
+ * None of the IOs succeeded.
+ */
+ pr_err("CMD failed with err %pe. Passing it up.\n", ERR_PTR(errno));
+ (*clt_conf)(clt_priv, errno);
+ } else {
+ total_failed = cmd_unit->failed_state +
+ (cmd_unit->num_sessions - atomic_read(&cmd_unit->succeeded));
+ /*
+ * Pass the number of failures up to the user.
+ */
+ (*clt_conf)(clt_priv, total_failed);
+ }
+
+ rmr_clt_put_cu(cmd_unit);
+ }
+
+ rmr_put_sess_iu(sess_iu->pool_sess, sess_iu);
+}
+
+/* The amount of data that belongs to an I/O and the amount of data that
+ * should be read or written to the disk (bi_size) can differ.
+ *
+ * E.g. When WRITE_SAME is used, only a small amount of data is
+ * transferred that is then written repeatedly over a lot of sectors.
+ *
+ * Get the size of data to be transferred via RTRS by summing up the size
+ * of the scather-gather list entries.
+ */
+static size_t rmr_clt_get_sg_size(struct scatterlist *sglist, u32 len)
+{
+ struct scatterlist *sg;
+ size_t tsize = 0;
+ int i;
+
+ for_each_sg(sglist, sg, len, i)
+ tsize += sg->length;
+ return tsize;
+}
+
+/**
+ * rmr_clt_request() - Request data transfer to/from storage node via given pool
+ *
+ * @pool: The Pool
+ * @iu: Iu allocated by pevious rmr_clt_get_iu call.
+ * @offset: offset inside the object to read/write:
+ * @length: length of data starting from offset
+ * @flag: READ/WRITE/REMOVE
+ * @prio: priority of IO
+ * @priv: User provided data, passed back with corresponding
+ * @(conf) confirmation.
+ * @conf: callback function to be called as confirmation
+ * @sg: Pages to be sent/received to/from server.
+ * @sg_cnt: Number of elements in the @sg
+ *
+ * Description:
+ * Data transfer through the given pool, using the underlying RTRS <-> RDMA
+ * While sending write IOs, if there are FAILED or RECONNECTING pool sessions, that IO
+ * would be added as dirty for such sessions.
+ * This is used by both pserver client, and the rmr server on the storage node to perform
+ * sync reads.
+ *
+ * Return:
+ * 0 on success. This means IO was sent. Final confirmation would be sent via conf function
+ * Error value on failure
+ */
+int rmr_clt_request(struct rmr_pool *pool, struct rmr_iu *iu,
+ size_t offset, size_t length, enum rmr_io_flags flag, unsigned short prio,
+ void *priv, rmr_conf_fn *conf, struct scatterlist *sg, unsigned int sg_cnt)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_clt_sess_iu *sess_iu, *tmp_sess_iu;
+ struct rtrs_clt_req_ops req_ops;
+ rmr_id_t id;
+ struct kvec vec;
+ size_t sg_len;
+ int dir, err, idx;
+ u32 rmr_flag;
+
+ rmr_get_iu(iu);
+ rmr_flag = rmr_op(flag);
+ dir = (rmr_flag == RMR_OP_READ) ? READ : WRITE;
+
+ sg_len = rmr_clt_get_sg_size(sg, sg_cnt);
+ if (!(flag & RMR_OP_DISCARD || flag & RMR_OP_WRITE_ZEROES))
+ WARN_ON(length != sg_len);
+
+ iu->msg.hdr.group_id = cpu_to_le32(pool->group_id);
+ iu->msg.hdr.type = cpu_to_le16(RMR_MSG_IO);
+ iu->msg.hdr.__padding = 0;
+
+ iu->msg.offset = cpu_to_le32(offset);
+ iu->msg.length = cpu_to_le32(length);
+ iu->msg.flags = cpu_to_le32(flag);
+ iu->msg.prio = cpu_to_le16(prio);
+
+ iu->msg.sync = pool->sync;
+
+ iu->priv = priv;
+ iu->conf = conf;
+ iu->pool = pool;
+
+ if (rmr_flag != RMR_OP_FLUSH && sg_len) {
+ rmr_map_calc_chunk(pool, offset, length, &id);
+ /*
+ * We are not ready to process IO requests which are across chunk boundary.
+ * The main area which needs work is triggering sync IO (see rmr-req.c) which
+ * holding the IO which touches multiple chunks. And then making sure other IOs
+ * which overlap these chunks are held properly, and restarted once the corresponding
+ * chunk is synced.
+ */
+ BUG_ON(id.a > 1);
+ iu->msg.id_a = cpu_to_le64(id.a);
+ iu->msg.id_b = cpu_to_le64(id.b);
+ }
+
+ if (rmr_flag == RMR_OP_READ) {
+ iu->sg = sg;
+ iu->sg_cnt = sg_cnt;
+ } else if (!pool->sync && rmr_flag == RMR_OP_WRITE) {
+ /*
+ * We take this path only for request from client side
+ * Never from rmr_req_remote_read.
+ */
+ int failed_cnt = 0;
+ int i;
+
+ atomic_set(&iu->succeeded, 0);
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+ struct rmr_clt_pool_sess *ps;
+ enum rmr_clt_pool_sess_state state;
+ u8 mid = pool->pool_md.srv_md[i].member_id;
+
+ if (!mid)
+ continue;
+
+ ps = xa_load(&pool->stg_members, mid);
+ if (ps) {
+ state = atomic_read(&ps->state);
+ if (state != RMR_CLT_POOL_SESS_FAILED &&
+ state != RMR_CLT_POOL_SESS_RECONNECTING)
+ continue;
+ }
+ /* ps == NULL (disassembled) or FAILED/RECONNECTING */
+ if (WARN_ON(failed_cnt >= RMR_POOL_MAX_SESS))
+ break;
+ iu->msg.map_ver = cpu_to_le64(pool->map_ver);
+ iu->msg.failed_id[failed_cnt] = mid;
+ failed_cnt++;
+ rmr_clt_map_add_id(pool, mid, id);
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ iu->msg.failed_cnt = failed_cnt;
+ } else if (pool->sync) {
+ pr_err("rmr_clt_request: Sync sessions do not process writes\n");
+ return -EPERM;
+ }
+
+ vec = (struct kvec) {
+ .iov_base = &iu->msg,
+ .iov_len = sizeof(iu->msg)
+ };
+
+ list_for_each_entry_safe(sess_iu, tmp_sess_iu,
+ &(iu->sess_list), entry) {
+ struct rmr_clt_sess *clt_sess;
+
+ pool_sess = sess_iu->pool_sess;
+ clt_sess = pool_sess->clt_sess;
+ iu->msg.member_id = pool_sess->member_id;
+
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_REMOVING ||
+ pool_sess->maintenance_mode) {
+ /*
+ * The storage for this session is getting removed from
+ * the pool, or is in maintenance mode.
+ * Simply complete this IO with error
+ */
+ err = -EAGAIN;
+ goto complete_io;
+ }
+
+ pr_debug("Sending %x request to pool %s session %s "
+ "chunk (%llu, %llu) offset %lu length %lu)\n",
+ rmr_flag,
+ pool->poolname, pool_sess->sessname,
+ id.a, id.b, offset, length);
+
+ if (rmr_flag == RMR_OP_READ) {
+ req_ops = (struct rtrs_clt_req_ops) {
+ .priv = sess_iu,
+ .conf_fn = msg_read_conf,
+ };
+ } else {
+ req_ops = (struct rtrs_clt_req_ops) {
+ .priv = sess_iu,
+ .conf_fn = msg_io_conf,
+ };
+
+ /*
+ * Update mem_id before transmitting each write IO to the corresponding
+ * server.
+ */
+ iu->msg.mem_id = cpu_to_le32(sess_iu->mem_id);
+ }
+
+ trace_rmr_clt_request(dir, sess_iu);
+
+ err = rtrs_clt_request(dir, &req_ops, clt_sess->rtrs,
+ sess_iu->permit, &vec, 1, sg_len,
+ sg, sg_cnt);
+
+complete_io:
+ if (err) {
+ if (rmr_flag == RMR_OP_READ)
+ msg_read_conf(sess_iu, err);
+ else
+ msg_io_conf(sess_iu, err);
+ }
+ }
+ rmr_put_iu(iu);
+
+ return 0;
+}
+EXPORT_SYMBOL(rmr_clt_request);
+
+/**
+ * rmr_clt_get_cu() - Allocate and return a command unit.
+ *
+ * @pool: rmr pool for which the command unit is to be allocated
+ *
+ * Description:
+ * Allocates and returns a command unit for the rmr pool. The command unit contains a list of
+ * session units, for each session which is not in the "REMOVING" state.
+ *
+ * Return:
+ * Pointer to the command unit
+ */
+static struct rmr_clt_cmd_unit *rmr_clt_get_cu(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_clt_cmd_unit *cmd_unit;
+ struct rmr_clt_sess_iu *sess_iu, *tmp_sess_iu;
+ int idx;
+
+ if (!test_bit(RMR_CLT_POOL_STATE_IN_USE, &clt_pool->state)) {
+ pr_err("%s: Pool %s not in use\n", __func__, pool->poolname);
+ rmr_clt_dump_state(clt_pool);
+ return NULL;
+ }
+
+ /*
+ * We get the inflight ref first.
+ * If we see that an IO freeze is in progress, we put the ref, and wait for it to unfreeze
+ *
+ * The while loop protects us from parallel freeze, like
+ * A leg deletion, and right after that a call to rmr_clt_close.
+ *
+ * We are guranteed to not go on an infinite loop, since rmr_clt_close can be called only
+ * once, And, there are limited legs to delete
+ */
+ percpu_ref_get(&pool->ids_inflight_ref);
+ while (atomic_read(&clt_pool->io_freeze) > 0) {
+ percpu_ref_put(&pool->ids_inflight_ref);
+ wait_event(clt_pool->map_update_wq, !atomic_read(&clt_pool->io_freeze));
+
+ /*
+ * Once IO is unfrozen, we check if the state of the pool has changed.
+ * It could be that rmr_clt_close was called, and hence state is not IN_USE.
+ * Or, it could be that the last leg was deleted, and we are not in JOINED state
+ *
+ * In both the case, we cannot service IOs, hence fail.
+ */
+ if (!test_bit(RMR_CLT_POOL_STATE_IN_USE, &clt_pool->state) ||
+ !test_bit(RMR_CLT_POOL_STATE_JOINED, &clt_pool->state)) {
+ pr_err("%s: Failed to get inflight IO ref.\n", __func__);
+ pr_err("%s: Pool %s is not joined or used\n", __func__, pool->poolname);
+ rmr_clt_dump_state(clt_pool);
+ return NULL;
+ }
+
+ percpu_ref_get(&pool->ids_inflight_ref);
+ }
+
+ cmd_unit = kzalloc(sizeof(*cmd_unit), GFP_KERNEL);
+ if (!cmd_unit) {
+ percpu_ref_put(&pool->ids_inflight_ref);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&cmd_unit->sess_list);
+ cmd_unit->pool = pool;
+ cmd_unit->clt_pool = clt_pool;
+ atomic_set(&cmd_unit->succeeded, 0);
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+ /*
+ * Acquire the permits for all sessions.
+ * Continue only if we manage to get permits for all "normal" sessions??
+ */
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_REMOVING)
+ continue;
+
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_FAILED) {
+ cmd_unit->failed_state++;
+ continue;
+ }
+
+ sess_iu = rmr_get_sess_iu(pool_sess, RTRS_ADMIN_CON, RTRS_PERMIT_NOWAIT);
+ if (unlikely(!sess_iu))
+ goto put_sessions;
+
+ sess_iu->rmr_cmd_unit = cmd_unit;
+
+ cmd_unit->num_sessions++;
+ list_add_tail(&(sess_iu->entry), (&cmd_unit->sess_list));
+ }
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ refcount_set(&cmd_unit->refcount, cmd_unit->num_sessions);
+
+ return cmd_unit;
+
+put_sessions:
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ /* Free sess_ius */
+ list_for_each_entry_safe(sess_iu, tmp_sess_iu,
+ &(cmd_unit->sess_list), entry) {
+ if (!list_empty(&sess_iu->entry))
+ list_del_init(&sess_iu->entry);
+ rmr_put_sess_iu(sess_iu->pool_sess, sess_iu);
+ }
+
+ rmr_clt_put_cu(cmd_unit);
+
+ return NULL;
+}
+
+/**
+ * rmr_clt_cmd_err_conf() - Calls confirmation function for commands
+ *
+ * @work: schedules work
+ *
+ * Description:
+ * In case of error in the user command path, we cannot call the confirmation function
+ * directly, since it might end up calling confirmation function of the user itself.
+ * Hence a work is scheduled to call the confirmation function in case the code for sending
+ * user commands itself fails.
+ */
+static void rmr_clt_cmd_err_conf(struct work_struct *work)
+{
+ struct rmr_clt_sess_iu *sess_iu = container_of(work, struct rmr_clt_sess_iu, work);
+
+ msg_cmd_conf(sess_iu, sess_iu->errno);
+}
+
+/**
+ * rmr_clt_cmd_with_rsp() - Sends a user command to all sessions of an rmr pool
+ *
+ * @pool: rmr pool to which the command is for
+ * @conf: confirmation function to be called after completion
+ * @priv: pointer to priv data, to be returned to user while calling conf function
+ * @usr_vec: kvec containing user data (mostly command messages?)
+ * @nr: number of kvecs
+ * @buf: buf where the response from the user server is to be directed
+ * The buf must be physically contiguous in memory (kmalloc()'d).
+ * @buf_len: length of the buffer
+ * @size: size of the buf to be sent to a single session
+ *
+ * Description:
+ * This function provides an interface for the user to send commands to the server side.
+ * The command is sent as a read, so that the response from the user srv side can be received
+ * The buffer sent by the user is meant to receive the response from the user server side.
+ * The size of the buffer is set during rmr_clt_open.
+ *
+ * Return:
+ * 0 on success
+ * negative errno in case of error
+ *
+ * Context:
+ * Inflight commands will block map update, until the inflights are completed.
+ */
+int rmr_clt_cmd_with_rsp(struct rmr_pool *pool, rmr_conf_fn *conf, void *priv,
+ const struct kvec *usr_vec, size_t nr, void *buf, int buf_len, size_t size)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_clt_sess_iu *sess_iu, *tmp_sess_iu;
+ struct rmr_clt_cmd_unit *cmd_unit;
+ struct rmr_msg_pool_cmd msg = {};
+ struct rtrs_clt_req_ops req_ops;
+ struct kvec *vec;
+ int i, j, err = 0;
+
+ /*
+ * TODO: kvmalloc() memory is yet to be supported for SG I/O.
+ */
+ if (is_vmalloc_addr(buf))
+ return -EINVAL;
+
+ if (buf_len != (RMR_POOL_MAX_SESS * size))
+ return -EINVAL;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_USER;
+
+ /*
+ * RMR msg struct + user vecs
+ */
+ vec = kzalloc((1 + nr) * sizeof(*vec), GFP_KERNEL);
+ if (!vec)
+ return -ENOMEM;
+
+ /*
+ * RMR msg struct first,
+ * followed by the user kvecs
+ */
+ vec[0].iov_base = &msg;
+ vec[0].iov_len = sizeof(msg);
+ for (i = 1, j = 0; j < nr; i++, j++) {
+ vec[i].iov_base = usr_vec[j].iov_base;
+ vec[i].iov_len = usr_vec[j].iov_len;
+
+ msg.user_cmd.usr_len += usr_vec[j].iov_len;
+ }
+
+ cmd_unit = rmr_clt_get_cu(pool);
+ if (!cmd_unit) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ cmd_unit->conf = conf;
+ cmd_unit->priv = priv;
+
+ i = 0;
+ list_for_each_entry_safe(sess_iu, tmp_sess_iu,
+ &(cmd_unit->sess_list), entry) {
+ pool_sess = sess_iu->pool_sess;
+
+ req_ops = (struct rtrs_clt_req_ops){
+ .priv = sess_iu,
+ .conf_fn = msg_cmd_conf,
+ };
+
+ /*
+ * The user expects each node to be able to send back data of this "size" as
+ * response.
+ * So divide the user buffer into chunks of "size", and send them to each leg.
+ */
+ sg_init_one(&sess_iu->sg, buf + (i * size), size);
+
+ trace_rmr_clt_cmd_with_rsp(READ, sess_iu);
+
+ err = rtrs_clt_request(READ, &req_ops, pool_sess->clt_sess->rtrs, sess_iu->permit,
+ vec, (1 + nr), size, &sess_iu->sg, 1);
+ if (err) {
+ /*
+ * We want to deal with this error just like we deal with the error
+ * received from the conf function returned from rtrs.
+ * This would help us to inform the user the correct number of commands
+ * which failed on the rmr level (rtrs is also rmr level for user).
+ */
+ pr_warn("rtrs_clt_request Failed with err %d\n", err);
+ sess_iu->errno = err;
+ INIT_WORK(&sess_iu->work, rmr_clt_cmd_err_conf);
+ schedule_work(&sess_iu->work);
+ err = 0;
+ }
+
+ i++;
+ }
+
+ /*
+ * No session to send command
+ */
+ if (i == 0) {
+ rmr_clt_put_cu(cmd_unit);
+ err = -EINVAL;
+ }
+
+out:
+ kfree(vec);
+
+ return err;
+}
+EXPORT_SYMBOL(rmr_clt_cmd_with_rsp);
+
+/**
+ * rmr_clt_send_cmd_with_data() - send command containing data buffer as a payload or response
+ *
+ * @pool: rmr pool to send command
+ * @pool_sess: client pool session used to send
+ * @msg: initialized command message describing the command
+ * @buf: pointer to the data buffer for data transfers
+ * @buflen: size of the buffer in bytes
+ *
+ * Description:
+ * Performs sending the command described by msg with a payload or response
+ * in the buf.
+ *
+ * Return:
+ * 0 on success, error code otherwise.
+ *
+ * Context:
+ * This function blocks while sending the buffer.
+ *
+ * Locks:
+ * should be called under srcu_read_lock since it uses pool_sess
+ */
+int rmr_clt_send_cmd_with_data(struct rmr_pool *pool, struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_msg_pool_cmd *msg,
+ void *buf, unsigned int buflen)
+{
+ struct rmr_clt_sess_iu *sess_iu;
+ struct rmr_clt_sess *clt_sess = pool_sess->clt_sess;
+ struct kvec vec = {
+ .iov_base = msg,
+ .iov_len = sizeof(*msg)
+ };
+ int errno = 0, err = 0;
+ int dir;
+
+ switch (msg->cmd_type) {
+ case RMR_CMD_MAP_CHECK:
+ case RMR_CMD_READ_MAP_BUF:
+ case RMR_CMD_MAP_GET_VER:
+ case RMR_CMD_MD_SEND:
+ case RMR_CMD_MAP_SET_VER:
+ dir = READ;
+ break;
+ case RMR_CMD_MAP_TEST:
+ case RMR_CMD_SEND_MAP_BUF:
+ case RMR_CMD_SEND_MD_BUF:
+ dir = WRITE;
+ break;
+ default:
+ pr_err("%s: pool %s cmd type %u is not supported\n",
+ __func__, pool->poolname, msg->cmd_type);
+ return -EINVAL;
+ }
+
+ // TODO: why io_con not admin?
+ if (clt_sess->state == RMR_CLT_SESS_DISCONNECTED) {
+ pr_debug("The rmr client session %s state is disconnected\n", clt_sess->sessname);
+ err = -EINVAL;
+ goto err;
+ }
+
+ sess_iu = rmr_msg_get_iu(pool_sess, RTRS_IO_CON, RTRS_PERMIT_WAIT, 2);
+ if (unlikely(!sess_iu)) {
+ err = -ENOMEM;
+ goto err;
+ }
+
+ sess_iu->buf = buf;
+ sg_init_one(&sess_iu->sg, buf, buflen);
+
+ err = send_usr_msg(clt_sess->rtrs, dir, sess_iu,
+ &vec, 1, buflen, &sess_iu->sg, 1,
+ msg_pool_cmd_map_content_conf, &errno, WAIT);
+ if (unlikely(err)) {
+ rmr_msg_put_iu(pool_sess, sess_iu);
+ } else {
+ err = errno;
+ }
+
+ rmr_msg_put_iu(pool_sess, sess_iu);
+
+err:
+ return err;
+}
+
+/**
+ * rmr_clt_pool_member_synced() - check if the pool member has no data to sync
+ *
+ * @pool: rmr pool in which we perform the check
+ * @member_id: id of the pool member tto check
+ *
+ * Description:
+ * Send the check map command to the pool member with the specified id.
+ * Pool member returns whether he has unsynced chunks or not.
+ *
+ * Return:
+ * error code if failed to send, 0 if pool member is not synced completely,
+ * 1 if pool member is synced (has no dirty chunks in his map).
+ *
+ * Context:
+ * This function blocks while sending the command.
+ *
+ * Locks:
+ * no
+ */
+int rmr_clt_pool_member_synced(struct rmr_pool *pool, u8 member_id)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_msg_pool_cmd_rsp rsp = {};
+ struct rmr_msg_pool_cmd msg = {};
+ int ret = 0, idx;
+ enum rmr_clt_pool_sess_state state;
+
+ pr_debug("start looking for session with member_id=%u\n", member_id);
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+
+ pool_sess = __find_sess_by_member_id(pool, member_id);
+ if (!pool_sess) {
+ pr_err("in pool %s failed to find sess with a member_id=%u\n",
+ pool->poolname, member_id);
+ ret = -ENOENT;
+ goto out;
+ }
+
+ pr_debug("found session %s with member_id=%u\n",
+ pool_sess->sessname, member_id);
+
+ state = atomic_read(&pool_sess->state);
+ if (state == RMR_CLT_POOL_SESS_FAILED ||
+ state == RMR_CLT_POOL_SESS_REMOVING) {
+ pr_debug("pool %s session %s is in %s state, cannot send cmd %s\n",
+ pool->poolname, pool_sess->sessname,
+ rmr_clt_sess_state_str(state), rmr_get_cmd_name(msg.cmd_type));
+ ret = -EINVAL;
+ goto out;
+ }
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_MAP_CHECK;
+
+ pr_debug("send cmd %u to %s\n", msg.cmd_type, pool_sess->sessname);
+ ret = rmr_clt_send_cmd_with_data(pool, pool_sess, &msg, &rsp, sizeof(rsp));
+ if (ret) {
+ pr_err("%s: For pool %s failed to %s, err %d\n",
+ __func__, pool->poolname, rmr_get_cmd_name(msg.cmd_type), ret);
+ goto out;
+ }
+
+ if (rsp.value)
+ ret = 1; // other side reported map is clear
+
+ pr_debug("send cmd %u to %s is done\n", msg.cmd_type, pool_sess->sessname);
+out:
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ return ret;
+}
+EXPORT_SYMBOL(rmr_clt_pool_member_synced);
+
+/**
+ * rmr_pool_md_to_buf - Fill the buffer with the metadata
+ *
+ * @pool: rmr pool contains the metadata. It must be a non-sync pool,
+ * either client or server pool.
+ * @buf: buffer to fill with the metadata.
+ *
+ */
+static void rmr_clt_md_to_buf(struct rmr_pool *pool, u8 *buf)
+{
+ struct rmr_pool_md *pool_md;
+ struct rmr_srv_md *srv_md;
+
+ if (pool->is_clt) {
+ pool_md = (struct rmr_pool_md *)buf;
+ /* copy the entire client pool md */
+ memcpy(pool_md, &pool->pool_md, sizeof(struct rmr_pool_md));
+ return;
+ }
+
+ srv_md = (struct rmr_srv_md *)(&buf[RMR_CLT_MD_SIZE]);
+ memcpy(srv_md, &pool->pool_md.srv_md[0], RMR_SRV_MD_SIZE);
+}
+
+/**
+ * rmr_clt_pool_send_md_all() - Send metadata of rmr pool
+ *
+ * Description:
+ * Send metadata of the src pool to all sessions of the client pool.
+ * 1) If the client pool is sync pool, it sends the entire server pool
+ * metadata back after the leader reads the metadata of its connected
+ * nodes.
+ * 2) If it is non-sync, send the client pool metadata to storage node
+ * backups.
+ */
+int rmr_clt_pool_send_md_all(struct rmr_pool *src_pool, struct rmr_pool *clt_pool)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_msg_pool_cmd msg = {};
+ void *buf;
+ u32 buflen;
+ int err = 0, idx;
+
+ if (!clt_pool) {
+ pr_err("Cannot send metadata when clt_pool is NULL\n");
+ return -EINVAL;
+ }
+
+ if (src_pool->sync) {
+ pr_err("Cannot send metadata when src_pool is sync\n");
+ return -EINVAL;
+ }
+
+ buf = kzalloc(RMR_MD_SIZE, GFP_KERNEL);
+ buflen = RMR_MD_SIZE;
+ if (!buf)
+ return -ENOMEM;
+
+ rmr_clt_md_to_buf(src_pool, buf);
+
+ /*
+ * It will continue to send the md to the next session even if the previous send failed.
+ */
+ idx = srcu_read_lock(&clt_pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &clt_pool->sess_list, entry,
+ (srcu_read_lock_held(&clt_pool->sess_list_srcu))) {
+ pr_debug("Start sending md for pool %s; to session %s with member_id %d\n",
+ src_pool->poolname, pool_sess->sessname, pool_sess->member_id);
+
+ rmr_clt_init_cmd(clt_pool, &msg);
+ msg.cmd_type = RMR_CMD_SEND_MD_BUF;
+ msg.send_md_buf_cmd = (struct rmr_msg_send_md_buf_cmd) {
+ .sync = clt_pool->sync,
+ /* the receiver of buffer is the leader */
+ .receiver_id = pool_sess->member_id,
+ /* change flags in cmd message */
+ .flags = RMR_OP_MD_WRITE,
+ };
+
+ err = rmr_clt_send_cmd_with_data(clt_pool, pool_sess, &msg, buf, buflen);
+ if (err) {
+ pr_debug("Cannot send the clt/srv_md of entire pool to the pool sess %s\n",
+ pool_sess->sessname);
+ continue;
+ }
+ }
+
+ pr_debug("send_md done\n");
+
+ kfree(buf);
+
+ srcu_read_unlock(&clt_pool->sess_list_srcu, idx);
+ return err;
+}
+EXPORT_SYMBOL(rmr_clt_pool_send_md_all);
+
+static int rmr_clt_start_send_md(struct rmr_pool *pool)
+{
+ return rmr_clt_pool_send_md_all(pool, pool);
+}
+
+/**
+ * rmr_clt_del_stor_from_pool() - Notify pool members that a storage node is leaving
+ *
+ * @pool_sess: The pool session of the departing storage node.
+ * @delete: True for a permanent deletion (%RMR_POOL_INFO_MODE_DELETE);
+ * false for a temporary disassembly (%RMR_POOL_INFO_MODE_DISASSEMBLE).
+ *
+ * Sends a POOL_INFO REMOVE message to all other active pool members so they
+ * can update their dirty maps and membership state accordingly.
+ *
+ * Return:
+ * 0 on success, negative error code on failure.
+ */
+int rmr_clt_del_stor_from_pool(struct rmr_clt_pool_sess *pool_sess, bool delete)
+{
+ enum rmr_pool_info_mode mode;
+ int err;
+
+ if (delete)
+ mode = RMR_POOL_INFO_MODE_DELETE;
+ else
+ mode = RMR_POOL_INFO_MODE_DISASSEMBLE;
+
+ err = rmr_clt_send_pool_info(pool_sess, RMR_POOL_INFO_OP_REMOVE, mode, false);
+ if (err) {
+ pr_err("rmr_clt_send_pool_info failed for session\n");
+ return err;
+ }
+
+ return 0;
+}
+
+static int __init rmr_client_init(void)
+{
+ int err;
+
+ pr_info("Loading module %s, version %s, proto %s\n", KBUILD_MODNAME,
+ RMR_VER_STRING, RMR_PROTO_VER_STRING);
+
+ err = rmr_clt_create_sysfs_files();
+ if (err) {
+ pr_err("Failed to load module,"
+ " creating sysfs device files failed, err: %d\n",
+ err);
+ goto out;
+ }
+
+ return 0;
+
+out:
+ return err;
+}
+
+static void __exit rmr_client_exit(void)
+{
+ struct rmr_pool *pool, *tmp;
+
+ pr_info("Unloading module\n");
+
+ list_for_each_entry_safe(pool, tmp, &pool_list, entry)
+ (void) rmr_clt_remove_pool_from_sysfs(pool, NULL);
+
+ rmr_clt_destroy_sysfs_files();
+ pr_info("Module unloaded\n");
+}
+
+module_init(rmr_client_init);
+module_exit(rmr_client_exit);
diff --git a/drivers/infiniband/ulp/rmr/rmr-map-mgmt.c b/drivers/infiniband/ulp/rmr/rmr-map-mgmt.c
new file mode 100644
index 000000000000..cade5dbf2e20
--- /dev/null
+++ b/drivers/infiniband/ulp/rmr/rmr-map-mgmt.c
@@ -0,0 +1,933 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Reliable multicast over RTRS (RMR) — client MAP-exchange management
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include <linux/module.h>
+#include <linux/blkdev.h>
+#include <linux/slab.h>
+#include <linux/wait.h>
+#include <linux/sched.h>
+
+#include "rmr-clt.h"
+#include "rmr-clt-trace.h"
+
+void send_map_check(struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool *pool = pool_sess->pool;
+ int err;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_MAP_CHECK;
+
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (err) {
+ pr_err("%s: For sess %s, %s failed with err %d\n",
+ __func__, pool_sess->sessname, rmr_get_cmd_name(msg.cmd_type), err);
+ return;
+ }
+}
+
+void send_store_check(struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool *pool = pool_sess->pool;
+ int err;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_STORE_CHECK;
+
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT); //am : why wait ?
+ if (err) {
+ pr_err("%s: For sess %s, %s failed with err %d\n",
+ __func__, pool_sess->sessname, rmr_get_cmd_name(msg.cmd_type), err);
+ pr_err("sess %s failed to send store check with err %d\n",
+ pool_sess->sessname, err);
+ }
+}
+
+int send_map_get_version(struct rmr_clt_pool_sess *pool_sess, u64 *ver)
+{
+ struct rmr_msg_pool_cmd_rsp rsp = {};
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool *pool = pool_sess->pool;
+ int err;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = RMR_CMD_MAP_GET_VER;
+
+ err = rmr_clt_send_cmd_with_data(pool, pool_sess, &msg, &rsp, sizeof(rsp));
+ if (err) {
+ pr_err("%s: For sess %s, %s failed with err %d\n",
+ __func__, pool_sess->sessname, rmr_get_cmd_name(msg.cmd_type), err);
+ return -EINVAL;
+ }
+
+ *ver = rsp.value;
+
+ return 0;
+}
+
+int send_discard(struct rmr_clt_pool_sess *pool_sess, u8 cmd_type, u8 member_id)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ struct rmr_pool *pool = pool_sess->pool;
+ int err;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = cmd_type;
+ msg.send_discard_cmd.member_id = member_id;
+
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (err) {
+ pr_err("%s: For sess %s, %s failed with err %d\n",
+ __func__, pool_sess->sessname, rmr_get_cmd_name(msg.cmd_type), err);
+ }
+
+ return err;
+}
+
+int rmr_clt_handle_map_check_rsp(struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_msg_pool_cmd_rsp *rsp)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ struct rmr_dirty_id_map *map;
+
+ pr_debug("pool %s sess %s member_id %u, rsp->value=%llu\n",
+ pool->poolname, pool_sess->sessname, rsp->member_id, rsp->value);
+ if (!rsp->value) // map is not empty on stg
+ return 0;
+
+ pr_debug("pool %s server with id %u has empty dirty map, lets clean it.\n",
+ pool->poolname, rsp->member_id);
+ map = rmr_pool_find_map(pool, rsp->member_id);
+ if (!map) {
+ pr_err("%s: pool %s no map found for member_id %u\n",
+ __func__, pool->poolname, rsp->member_id);
+ return -EINVAL;
+ //TODO: handle this, how?
+ }
+
+ if (!rmr_map_empty(map)) {
+ pr_debug("pool %s dirty map for member_id %d is not empty, map->ts %lu (now %lu)\n",
+ pool->poolname, rsp->member_id, map->ts, jiffies);
+ if (time_after(jiffies, map->ts + msecs_to_jiffies(RMR_MAP_CLEAN_DELAY_MS))) {
+ pr_info("%s: pool %s clear dirty map for member_id %d\n",
+ __func__, pool->poolname, rsp->member_id);
+ rmr_map_unset_dirty_all(map);
+ map->ts = jiffies;
+ }
+ }
+
+ pr_debug("pool %s map with member_id %u cleaned\n",
+ pool->poolname, map->member_id);
+ return 0;
+}
+
+int rmr_clt_handle_store_check_rsp(struct rmr_clt_pool_sess *pool_sess,
+ struct rmr_msg_pool_cmd_rsp *rsp)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ int err = 0;
+
+ pr_debug("pool %s sess %s member_id %u, rsp->value=%llu\n",
+ pool->poolname, pool_sess->sessname, rsp->member_id, rsp->value);
+ if (!rsp->value) {
+ pr_debug("pool %s sess %s (state=%d) reported that store is not available, changing state\n",
+ pool->poolname, pool_sess->sessname, atomic_read(&pool_sess->state));
+ return 0;
+ }
+ pr_info("pool %s sess %s (state=%d) reported that store is available, changing state\n",
+ pool->poolname, pool_sess->sessname, atomic_read(&pool_sess->state));
+
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_RECONNECTING);
+
+ if (!pool_sess->maintenance_mode) {
+ err = rmr_clt_pool_try_enable(pool);
+ if (err) {
+ pr_err("%s: pool %s try_enable failed for sess %s: %d\n",
+ __func__, pool->poolname, pool_sess->sessname, err);
+ return err;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Pre-requisite: rcu read lock should be held by caller
+ */
+static struct rmr_clt_pool_sess *rmr_clt_get_first_reconnecting_session(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_RECONNECTING)
+ return pool_sess;
+ }
+
+ return NULL;
+}
+
+/**
+ * rmr_clt_pool_map_xfer() - transfer dirty maps between rmr client and server
+ *
+ * @pool: the rmr pool used for map transfers
+ * @pool_sess: client pool session that is used for map transfer
+ * @cmd_type: pool command type generated for this transfer, for now only
+ * RMR_CMD_READ_MAP_BUF, RMR_CMD_SEND_MAP_BUF, RMR_CMD_MAP_TEST are used
+ * @buf: pointer to the data buffer for data transfers
+ * @buflen: size of the buffer in bytes
+ * @map_idx: index of the map in dirty map array from which we start to send or receive
+ * the data
+ * @offset: key in the map from which we start to send/receive the data about the maps
+ *
+ * Description:
+ * Performs transfer of the information about the dirty maps starting from the map with
+ * position map_idx in the array of dirty maps and from the start_key at that map.
+ * cmd types are handled as follows:
+ * RMR_CMD_READ_MAP_BUF - read the information about the maps from the pool and fill buf
+ * RMR_CMD_SEND_MAP_BUF - send buf with filled data to the pull
+ * RMR_CMD_MAP_TEST - send the buf with data to the pool to perform map comparison
+ *
+ * Return:
+ * 0 on success, error code otherwise.
+ *
+ * Context:
+ * This function blocks while sending the buffer.
+ *
+ * Locks:
+ * should be called under srcu_read_lock since it uses pool_sess
+ */
+static int rmr_clt_pool_map_xfer(struct rmr_pool *pool, struct rmr_clt_pool_sess *pool_sess,
+ int cmd_type, void *buf, unsigned int buflen,
+ u8 map_idx, u64 slp_idx)
+{
+ struct rmr_msg_pool_cmd msg = {};
+ int err;
+
+ rmr_clt_init_cmd(pool, &msg);
+ msg.cmd_type = cmd_type;
+
+ msg.map_buf_cmd.map_idx = map_idx;
+ msg.map_buf_cmd.slp_idx = slp_idx;
+
+ err = rmr_clt_send_cmd_with_data(pool, pool_sess, &msg, buf, buflen);
+ if (err) {
+ pr_debug("pool %s failed to send map xfer cmd %u, err %d\n",
+ pool->poolname, cmd_type, err);
+ return err;
+ }
+
+ return 0;
+}
+
+int rmr_clt_read_map(struct rmr_pool *pool)
+{
+ struct rmr_clt_pool_sess *pool_sess = NULL;
+ struct rmr_map_buf_hdr *map_buf_hdr;
+ u8 map_idx = 0;
+ u64 slp_idx = 0;
+ int err = 0, buflen, idx;
+ void *buf;
+
+ idx = srcu_read_lock(&pool->sess_list_srcu);
+
+ pool_sess = rmr_clt_get_first_reconnecting_session(pool);
+ if (pool_sess == NULL) {
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+ pr_err("%s: No created session found\n", __func__);
+ return -EINVAL;
+ }
+
+ buflen = RTRS_IO_LIMIT;
+ buf = kzalloc(buflen, GFP_KERNEL);
+ if (!buf) {
+ pr_err("%s: Error allocating buffer\n", __func__);
+ err = -ENOMEM;
+ goto ret;
+ }
+
+ while (true) {
+ err = rmr_clt_pool_map_xfer(pool, pool_sess, RMR_CMD_READ_MAP_BUF,
+ buf, buflen, map_idx, slp_idx);
+ if (err) {
+ pr_debug("rmr_clt_pool_map_xfer failed for pool %s, err %d\n",
+ pool->poolname, err);
+ goto ret_free;
+ }
+
+ map_buf_hdr = (struct rmr_map_buf_hdr *)buf;
+ if (map_buf_hdr->member_id == 0)
+ break;
+
+ err = rmr_pool_save_map(pool, buf, buflen, false);
+ if (err) {
+ pr_err("rmr_pool_save_map failed\n");
+ goto ret_free;
+ }
+
+ map_idx = map_buf_hdr->map_idx;
+ slp_idx = map_buf_hdr->slp_idx;
+ }
+
+ret_free:
+ kfree(buf);
+
+ret:
+ srcu_read_unlock(&pool->sess_list_srcu, idx);
+
+ return err;
+}
+
+/**
+ * rmr_clt_spread_map() - Spread the map contained in storage node connected by pool_sess_chosen
+ *
+ * @pool: The pool
+ * @pool_sess_chosen: pool session from where the map is to be updated from
+ * @enable: Whether the last MAP_DONE command should have the enable param set or not
+ * @skip_normal: If true, freeze IOs before spreading and silently skip any NORMAL
+ * sessions encountered in the loop (used in Case 1 recovery where
+ * pool_sess_chosen is itself a NORMAL session that is still serving IOs).
+ * If false, encountering a NORMAL session is treated as an error.
+ *
+ * Description:
+ * This function spreads the map contained in the storage node connected by given pool
+ * session. The param enable denotes whether the map update should result in the storage
+ * going to NORMAL state or not. This is controlled by the enable param in the last MAP_DONE
+ * message.
+ *
+ * Return:
+ * 0 on success
+ * Error value on failure
+ *
+ * Context:
+ * srcu_read_lock should be held while calling this function.
+ */
+int rmr_clt_spread_map(struct rmr_pool *pool, struct rmr_clt_pool_sess *pool_sess_chosen,
+ bool enable, bool skip_normal)
+{
+ struct rmr_clt_pool *clt_pool = (struct rmr_clt_pool *)pool->priv;
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_msg_pool_cmd msg = {};
+ int state, err = 0;
+
+ rmr_clt_init_cmd(pool, &msg);
+
+ /*
+ * If we expect NORMAL session, then we should expect IOs running.
+ * Which is why we should freeze IOs before doing map_update.
+ */
+ if (skip_normal) {
+ /* Freeze IOs */
+ rmr_clt_pool_io_freeze(clt_pool);
+
+ /* Wait for all completion */
+ rmr_clt_pool_io_wait_complete(clt_pool);
+ }
+
+ /*
+ * TODO: Use rmr_clt_handle_discard to check whether the pool
+ * session has pending discard request to be sent.
+ *
+ * Enable this when we fix replace.
+ *
+ err = rmr_clt_handle_discard(pool);
+ if (err) {
+ pr_err("%s: discard handling failed\n", __func__);
+ goto err;
+ }
+ */
+
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (pool_sess == pool_sess_chosen)
+ continue;
+
+ state = atomic_read(&pool_sess->state);
+ if (state == RMR_CLT_POOL_SESS_NORMAL) {
+ if (skip_normal)
+ continue;
+ pr_err("%s: pool %s unexpected NORMAL session %s during spread\n",
+ __func__, pool->poolname, pool_sess->sessname);
+ err = -EINVAL;
+ goto err_out;
+ }
+
+ if (state != RMR_CLT_POOL_SESS_RECONNECTING ||
+ pool_sess->maintenance_mode)
+ continue;
+
+ msg.cmd_type = RMR_CMD_MAP_READY;
+
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (err) {
+ pr_err("%s: %s failed\n", __func__, rmr_get_cmd_name(msg.cmd_type));
+ goto err_dis;
+ }
+
+ msg.cmd_type = RMR_CMD_MAP_SEND;
+ msg.map_send_cmd.receiver_member_id = pool_sess->member_id;
+ err = rmr_clt_pool_send_cmd(pool_sess_chosen, &msg, WAIT);
+ if (err) {
+ pr_err("%s: %s failed\n", __func__, rmr_get_cmd_name(msg.cmd_type));
+ goto err_dis;
+ }
+
+ msg.cmd_type = RMR_CMD_MAP_DONE;
+ msg.map_done_cmd.enable = enable;
+
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (err) {
+ pr_err("%s: %s failed\n", __func__, rmr_get_cmd_name(msg.cmd_type));
+ goto err_dis;
+ }
+ }
+
+ /* Unfreeze IOs and wake up */
+ if (skip_normal)
+ rmr_clt_pool_io_unfreeze(clt_pool);
+
+ return 0;
+
+err_dis:
+ list_for_each_entry_srcu(pool_sess, &pool->sess_list, entry,
+ (srcu_read_lock_held(&pool->sess_list_srcu))) {
+ if (pool_sess == pool_sess_chosen)
+ continue;
+
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_NORMAL) {
+ if (skip_normal)
+ continue;
+ pr_err("%s: pool %s unexpected NORMAL session %s during spread\n",
+ __func__, pool->poolname, pool_sess->sessname);
+ }
+
+ msg.cmd_type = RMR_CMD_MAP_DISABLE;
+ rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ }
+
+err_out:
+ /* Unfreeze IOs and wake up */
+ if (skip_normal)
+ rmr_clt_pool_io_unfreeze(clt_pool);
+
+ return err;
+}
+
+/**
+ * rmr_clt_set_pool_sess_mm() - Set the rmr clt pool session to maintenance mode
+ *
+ * @pool_sess: The rmr clt pool session to set in maintenance mode
+ *
+ * Description:
+ * This function does the necessary work required, like setting the pool session to
+ * maintenance mode and updating the state.
+ * It then also communicates this state change to the corresponding storage node.
+ *
+ * Return:
+ * 0 on success
+ * Error value on failure
+ */
+int rmr_clt_set_pool_sess_mm(struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ int err;
+
+ pr_info("%s: Putting sess %s of pool %s in maintenance mode\n",
+ __func__, pool_sess->sessname, pool->poolname);
+
+ if (pool_sess->maintenance_mode)
+ goto send_message;
+
+ /*
+ * If the pool_sess is to be put in maintenance mode,
+ * update relevant states and params, Then send message to storage node.
+ *
+ * We do not need any kind of locking for this, because of the way IO units (IU) are
+ * allocated & sent. The mm mode update & the state change can happen at multiple places.
+ *
+ * 1) If the state changes before the pool_sess is picked up into the IU, then we are safe
+ * 2) If the state changes after the pool_sess is picked up into the IU, but before,
+ * rmr_clt_request, it will be failed in rmr_clt_request.
+ * 3) If the state changes after rmr_clt_request, the IO would be sent to the storage node
+ * for that pool_sess. Then we have 2 cases,
+ * a) The message for maintenance_mode is received by the storage node before the IO,
+ * then the storage node will fail the IO. Failure would then be handled by the client.
+ * b) The message for maintenance_mode is received by the storage node after the IO,
+ * then the storage node will process the IO, and return success to client. In this case
+ * also we are fine, since the IO got processes successfully.
+ */
+ pool->map_ver++;
+ pool_sess->maintenance_mode = true;
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_RECONNECTING);
+
+send_message:
+ err = send_msg_enable_pool(pool_sess, 0);
+ if (err) {
+ pr_err("%s: send_msg_enable_pool failed for pool %s. Err %d\n",
+ __func__, pool->poolname, err);
+ }
+
+ return err;
+}
+
+/**
+ * rmr_clt_unset_pool_sess_mm() - Clear the rmr clt pool sessions maintenance mode
+ *
+ * @pool_sess: The rmr clt pool session to clear maintenance mode of
+ *
+ * Description:
+ * This function clears the maintenance mode of the given rmr clt pool session.
+ * It also does the map_update which essentially brings the pool_session and its
+ * corresponding storage node to NORMAL state.
+ *
+ * Return:
+ * 0 on success
+ * Error value on failure
+ */
+int rmr_clt_unset_pool_sess_mm(struct rmr_clt_pool_sess *pool_sess)
+{
+ struct rmr_pool *pool = pool_sess->pool;
+ int err;
+
+ pr_info("%s: Putting to sess %s of pool %s out of maintenance mode\n",
+ __func__, pool_sess->sessname, pool->poolname);
+
+ /*
+ * Cannot be in NORMAL and CREATED states while in maintenance mode.
+ */
+ WARN_ON(atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_NORMAL);
+ WARN_ON(atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_CREATED);
+
+ /*
+ * If this pool_sess is getting removed, we fail unset maintenance mode
+ */
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_REMOVING)
+ return -EINVAL;
+
+ /*
+ * First unset mm of storage node
+ */
+ err = send_msg_enable_pool(pool_sess, 1);
+ if (err) {
+ pr_err("Failed to send enable to pool %s. Err %d\n",
+ pool->poolname, err);
+ return -EINVAL;
+ }
+
+ /* Now do this */
+ pool_sess->maintenance_mode = false;
+
+ /*
+ * For FAILED states, further action would happen when it goes to RECONNECTING state
+ */
+ if (atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_FAILED)
+ return 0;
+
+ /*
+ * Since we are in RECONNECTING state, we do map update here.
+ */
+ err = rmr_clt_pool_try_enable(pool);
+ if (err) {
+ pr_err("%s: pool %s try_enable failed for sess %s: %d\n",
+ __func__, pool->poolname, pool_sess->sessname, err);
+ return err;
+ }
+
+ return 0;
+}
+
+void msg_pool_cmd_map_content_conf(struct work_struct *work)
+{
+ struct rmr_clt_sess_iu *sess_iu = container_of(work, struct rmr_clt_sess_iu, work);
+ struct rmr_clt_pool_sess *pool_sess = sess_iu->pool_sess;
+
+ pr_debug("%s: session %s conf with errno %d\n",
+ __func__, pool_sess->sessname, sess_iu->errno);
+
+ wake_up_iu_comp(sess_iu);
+ rmr_msg_put_iu(pool_sess, sess_iu);
+}
+
+static void send_map_update_done(struct work_struct *work)
+{
+ struct rmr_clt_sess_iu *sess_iu = container_of(work, struct rmr_clt_sess_iu, work);
+ struct rmr_iu *iu = sess_iu->rmr_iu;
+ struct rmr_clt_pool_sess *pool_sess = sess_iu->pool_sess;
+ int errno = sess_iu->errno;
+
+ pr_debug("%s: Session %s, err %d, iu %p\n",
+ __func__, pool_sess->sessname, errno, iu);
+ WARN_ON(atomic_read(&pool_sess->state) == RMR_CLT_POOL_SESS_CREATED);
+
+ /*
+ * We leave "iu->errno" set from the IO failure.
+ * Even though one map_add succeeds, we clear `iu->errno`
+ * and the main IO succeeds. And all other map_adds
+ * simply trigger session state change to FAILURE.
+ */
+ if (!errno) {
+ iu->errno = 0;
+ } else {
+ pr_err_ratelimited("%s: for sess %s got errno: %d\n",
+ __func__, pool_sess->sessname, errno);
+
+ if (iu->errno)
+ /* only the last error is reported */
+ iu->errno = errno;
+ pool_sess_change_state(pool_sess, RMR_CLT_POOL_SESS_FAILED);
+ }
+
+ pr_debug("%s: Before dec and test iu %p refcnt=%d\n",
+ __func__, iu, refcount_read(&iu->refcount));
+
+ if (refcount_dec_and_test(&iu->refcount)) {
+ rmr_conf_fn *conf = iu->conf;
+
+ pr_debug("all maps updated, call conf %p withh errno %d\n",
+ conf, errno);
+ (*conf)(iu->priv, iu->errno);
+ }
+}
+
+/**
+ * rmr_clt_send_map_update() - Send map update to all connected storage nodes
+ *
+ * @pool: The client pool of whose sessions the update is to be sent
+ * @iu: The IO unit containing the information for the update
+ *
+ * Description:
+ * Send map update, using the underlying RTRS <-> RDMA
+ * Currently we use the same rmr_iu as IO, since it saves us time.
+ * When an IO fails, and a MAP_ADD is to be sent, the code reuses the
+ * same rmr_iu used for IO. This way we do not spend time acquiring
+ * and initializing another rmr_iu.
+ *
+ * A map update currently can either be a MAP_ADD or a MAP_CLEAR.
+ * The caller must make sure the basic and required information for both
+ * the above commands is updated in the rmr_iu.
+ * Basic being the pool group_id, msg hdr type, etc.
+ * Required being the following,
+ * MAP_ADD requires the rmr_id_t chunk numbers, failed_id array and failed_cnt
+ * MAP_CLEAR requires the rmr_id_t and the member_id
+ *
+ * Return:
+ * 0 on success. This means the map_update was sent successfully.
+ * The subsequent status (err or not) goes to iu->conf call,
+ * so the caller should check that too.
+ *
+ * Error value on failure. When this function returns error,
+ * be aware that the iu->conf will not be called.
+ */
+int rmr_clt_send_map_update(struct rmr_pool *pool, struct rmr_iu *iu)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_clt_sess_iu *sess_iu, *tmp_sess_iu;
+ struct rtrs_clt_req_ops req_ops;
+ struct kvec vec;
+ int err;
+
+ pr_debug("%s: rmr_id (%llu, %llu), msg %d, refcnt=%d\n", __func__,
+ iu->msg.id_a, iu->msg.id_b, iu->msg.hdr.type, refcount_read(&iu->refcount));
+
+ if (!pool) {
+ pr_err("Cannot send map update. pool is NULL\n");
+ return -EINVAL;
+ }
+
+ rmr_get_iu(iu);
+
+ vec = (struct kvec){
+ .iov_base = &iu->msg,
+ .iov_len = sizeof(iu->msg)
+ };
+
+ list_for_each_entry_safe(sess_iu, tmp_sess_iu, &(iu->sess_list), entry) {
+ struct rmr_clt_sess *clt_sess;
+ enum rmr_clt_pool_sess_state state;
+
+ pool_sess = sess_iu->pool_sess;
+ clt_sess = pool_sess->clt_sess;
+
+ INIT_WORK(&sess_iu->work, send_map_update_done);
+
+ req_ops = (struct rtrs_clt_req_ops) {
+ .priv = sess_iu,
+ .conf_fn = msg_conf,
+ };
+
+ state = atomic_read(&pool_sess->state);
+ if (state == RMR_CLT_POOL_SESS_FAILED ||
+ state == RMR_CLT_POOL_SESS_REMOVING) {
+ /*
+ * Sessions in failed state is probably the reason why we sending
+ * map add in the first place.
+ * We can skip those sessions, since map update will take care of this.
+ */
+ pr_debug("%s: skipped sess %s\n", __func__, sess_iu->pool_sess->sessname);
+ sess_iu->errno = -EINVAL;
+ schedule_work(&sess_iu->work);
+ continue;
+ }
+
+ pr_debug("Sending request flags %u to pool %s session %s "
+ "chunk [%llu, %llu] offset %u length %u)\n",
+ iu->msg.flags, pool->poolname, pool_sess->sessname,
+ iu->msg.id_a, iu->msg.id_b,
+ iu->msg.offset, iu->msg.length);
+
+ trace_send_map_update(WRITE, sess_iu);
+
+ err = rtrs_clt_request(WRITE, &req_ops, clt_sess->rtrs,
+ sess_iu->permit, &vec, 1, 0, NULL, 0);
+
+ /* we can ignore errno since we called rmr_clt_send_map_update with NO_WAIT */
+ if (err) {
+ sess_iu->errno = err;
+
+ pr_err("%s: Failed with err %d, schedule work\n",
+ __func__, err);
+ schedule_work(&sess_iu->work);
+ }
+ }
+ rmr_put_iu(iu);
+
+ /*
+ * We are handling err through iu->conf
+ */
+ return 0;
+}
+EXPORT_SYMBOL(rmr_clt_send_map_update);
+
+int rmr_clt_map_add_id(struct rmr_pool *pool, int stg_id, rmr_id_t id)
+{
+ struct rmr_dirty_id_map *map;
+
+ map = rmr_pool_find_map(pool, stg_id);
+ if (!map) {
+ pr_err("in pool %s cannot find map for member_id %u\n",
+ pool->poolname, stg_id);
+ return -EINVAL;
+ }
+
+ map->ts = jiffies;
+ rmr_map_set_dirty(map, id, 0);
+
+ pr_debug("pool %s id (%llu, %llu) inserted to the dirty map\n",
+ pool->poolname, id.a, id.b);
+
+ return 0;
+}
+
+void sched_map_add(struct work_struct *work)
+{
+ struct rmr_iu *iu = container_of(work, struct rmr_iu, work);
+ struct rmr_pool *pool = iu->pool;
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_clt_sess_iu *sess_iu;
+ rmr_conf_fn *clt_conf = iu->conf;
+ void *clt_priv = iu->priv;
+ int failed_cnt = 0, err = 0;
+ rmr_id_t id;
+
+ pr_debug("scheduled work process for rmr iu %p send map add id (%llu, %llu), poolname %s\n",
+ iu, iu->msg.id_a, iu->msg.id_b, pool->poolname);
+
+ /*
+ * For MAP_ADD, we need failed_id, failed_cnt, and rmr_id_t for chunk number.
+ *
+ * We reuse the iu which was used for this IO.
+ * It already has the chunk number, the clt_conf function to be called,
+ * and other important things.
+ */
+ iu->msg.hdr.type = cpu_to_le16(RMR_MSG_MAP_ADD);
+
+ id.a = le64_to_cpu(iu->msg.id_a);
+ id.b = le64_to_cpu(iu->msg.id_b);
+ list_for_each_entry(sess_iu, &(iu->sess_list), entry) {
+ pool_sess = sess_iu->pool_sess;
+
+ if (sess_iu->errno) {
+ iu->msg.map_ver = cpu_to_le64(pool->map_ver);
+ iu->msg.failed_id[failed_cnt] = pool_sess->member_id;
+ failed_cnt++;
+
+ rmr_clt_map_add_id(pool, pool_sess->member_id, id);
+ }
+ }
+ iu->msg.failed_cnt = failed_cnt;
+
+ err = rmr_clt_send_map_update(pool, iu);
+ if (err) {
+ pr_err("error sending map add for id (%llu, %llu), err=%d\n",
+ iu->msg.id_a, iu->msg.id_b, err);
+ (*clt_conf)(clt_priv, err);
+ }
+}
+
+/**
+ * rmr_clt_send_map() - Send dirty map entries
+ *
+ * @map_src_pool: Pool whose map is to be sent
+ * @clt_pool: Client pool through which the dest session is selected
+ * @map_send_cmd: Command structure containing the member_id of the target session
+ * where the map is to be sent. If NULL then send to all of the session
+ *
+ * Return:
+ * 0 on success, err code otherwise.
+ *
+ * Description:
+ * Sends all the dirty entries from the map in "map_src_pool" to the session with
+ * member_id equal to member_id mentioned in the map_send_cmd.
+ * The session where to send the map is picked from the clt_pool. If
+ * map_send_cmd is NULL then send cmd to all of the sessions in clt_pool.
+ *
+ * Context:
+ * This function blocks while sending the map.
+ */
+int rmr_clt_send_map(struct rmr_pool *map_src_pool, struct rmr_pool *clt_pool,
+ const struct rmr_msg_map_send_cmd *map_send_cmd, rmr_map_filter filter)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ struct rmr_msg_pool_cmd msg = {};
+ bool sess_found = false;
+ void *bitmap_buf;
+ int err = 0, idx;
+
+ if (!clt_pool) {
+ pr_err("Cannot send map, when clt_pool is NULL\n");
+ return -EINVAL;
+ }
+
+ bitmap_buf = kzalloc(RTRS_IO_LIMIT, GFP_KERNEL);
+ if (!bitmap_buf) {
+ pr_err("%s: pool %s error allocating buffer to send map\n",
+ __func__, map_src_pool->poolname);
+ return -ENOMEM;
+ }
+
+ idx = srcu_read_lock(&clt_pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &clt_pool->sess_list, entry,
+ (srcu_read_lock_held(&clt_pool->sess_list_srcu))) {
+ int bytes = 0;
+ u8 map_idx = 0;
+ u64 slp_idx = 0;
+
+ /* if we have a command then skip all the sessions that are not in command */
+ if (map_send_cmd && pool_sess->member_id != map_send_cmd->receiver_member_id)
+ continue;
+
+ sess_found = true;
+ pr_info("Start sending dirty map for pool %s; to session %s with member_id %d\n",
+ map_src_pool->poolname, pool_sess->sessname, pool_sess->member_id);
+
+ while ((bytes = rmr_pool_maps_to_buf(map_src_pool, &map_idx, &slp_idx,
+ bitmap_buf, RTRS_IO_LIMIT, filter)) > 0) {
+ pr_debug("mapped %d bytes to bitmap_buf\n", bytes);
+
+ err = rmr_clt_pool_map_xfer(clt_pool, pool_sess, RMR_CMD_SEND_MAP_BUF,
+ bitmap_buf, bytes, 0, 0);
+ if (err) {
+ pr_err("%s: Failed to send bitmap_buf, from %s to %s err %d\n",
+ __func__, map_src_pool->poolname, clt_pool->poolname, err);
+ goto err_free;
+ }
+ }
+
+ rmr_clt_init_cmd(map_src_pool, &msg);
+ msg.cmd_type = RMR_CMD_MAP_BUF_DONE;
+ msg.map_buf_done_cmd.map_version = map_src_pool->map_ver;
+
+ err = rmr_clt_pool_send_cmd(pool_sess, &msg, WAIT);
+ if (err) {
+ pr_err("%s: For pool %s, %s failed\n",
+ __func__, map_src_pool->poolname, rmr_get_cmd_name(msg.cmd_type));
+ goto err_free;
+ }
+ }
+
+ if (map_send_cmd && !sess_found) {
+ pr_err("pool %s failed to find sess with member_id %u to send map\n",
+ clt_pool->poolname, map_send_cmd->receiver_member_id);
+ err = -EINVAL;
+ goto err_free;
+ }
+
+ pr_info("%s: Sending map done\n", __func__);
+
+err_free:
+ kfree(bitmap_buf);
+ srcu_read_unlock(&clt_pool->sess_list_srcu, idx);
+
+ return err;
+}
+EXPORT_SYMBOL(rmr_clt_send_map);
+
+int rmr_clt_test_map(struct rmr_pool *src_pool, struct rmr_pool *dst_pool)
+{
+ struct rmr_clt_pool_sess *pool_sess;
+ void *bitmap_buf;
+ int err, idx;
+
+ pr_info("test maps from src_pool=%s to dst_pool=%s...\n",
+ src_pool->poolname, dst_pool->poolname);
+
+ bitmap_buf = kzalloc(RTRS_IO_LIMIT, GFP_KERNEL);
+ if (!bitmap_buf) {
+ pr_err("%s: Error allocating buffer\n", __func__);
+ err = -ENOMEM;
+ goto err;
+ }
+
+ idx = srcu_read_lock(&dst_pool->sess_list_srcu);
+ list_for_each_entry_srcu(pool_sess, &dst_pool->sess_list, entry,
+ (srcu_read_lock_held(&dst_pool->sess_list_srcu))) {
+ enum rmr_clt_pool_sess_state state;
+ int bytes = 0;
+ u8 map_idx = 0;
+ u64 slp_idx = 0;
+
+ state = atomic_read(&pool_sess->state);
+ if (state == RMR_CLT_POOL_SESS_CREATED ||
+ state == RMR_CLT_POOL_SESS_FAILED) {
+ pr_warn("sess %s is in created/failed state, skip map test.\n",
+ pool_sess->sessname);
+ continue;
+ }
+ pr_info("perform map test for sess %s\n", pool_sess->sessname);
+ while ((bytes = rmr_pool_maps_to_buf(src_pool, &map_idx, &slp_idx,
+ bitmap_buf, RTRS_IO_LIMIT,
+ MAP_NO_FILTER)) > 0) {
+ pr_debug("mapped %d bytes to bitmap_buf\n", bytes);
+
+ err = rmr_clt_pool_map_xfer(dst_pool, pool_sess, RMR_CMD_MAP_TEST,
+ bitmap_buf, bytes, 0, 0);
+ if (err) {
+ pr_err("%s: For sess %s failed test map, src_pool %s dst_pool %s err %d\n",
+ __func__, pool_sess->sessname, src_pool->poolname,
+ dst_pool->poolname, err);
+ srcu_read_unlock(&dst_pool->sess_list_srcu, idx);
+ goto err_free;
+ }
+ }
+ pr_info("sess %s map test done\n", pool_sess->sessname);
+ }
+ srcu_read_unlock(&dst_pool->sess_list_srcu, idx);
+
+err_free:
+ kfree(bitmap_buf);
+err:
+ pr_info("test maps from src_pool=%s to dst_pool=%s done, err %d\n",
+ src_pool->poolname, dst_pool->poolname, err);
+
+ return err;
+}
+EXPORT_SYMBOL(rmr_clt_test_map);
--
2.43.0
next prev parent reply other threads:[~2026-05-05 7:47 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-05 7:46 [LSF/MM/BPF RFC PATCH 00/13] Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 01/13] RDMA/rmr: add public and private headers Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 02/13] RDMA/rmr: add shared library code (pool, map, request) Md Haris Iqbal
2026-05-05 7:46 ` Md Haris Iqbal [this message]
2026-05-05 7:46 ` [PATCH 04/13] RDMA/rmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 05/13] RDMA/rmr: server: main functionality Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 06/13] RDMA/rmr: server: sysfs interface functions Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 07/13] RDMA/rmr: include client and server modules into kernel compilation Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 08/13] block/brmr: add private headers with brmr protocol structs and helpers Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 09/13] block/brmr: client: main functionality Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 10/13] block/brmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 11/13] block/brmr: server: main functionality Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 12/13] block/brmr: server: sysfs interface functions Md Haris Iqbal
2026-05-05 7:46 ` [PATCH 13/13] block/brmr: include client and server modules into kernel compilation Md Haris Iqbal
2026-05-12 10:34 ` [LSF/MM/BPF RFC PATCH 00/13] Leon Romanovsky
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260505074644.195453-4-haris.iqbal@ionos.com \
--to=haris.iqbal@ionos.com \
--cc=axboe@kernel.dk \
--cc=bvanassche@acm.org \
--cc=hch@lst.de \
--cc=jgg@ziepe.ca \
--cc=jia.li@ionos.com \
--cc=jinpu.wang@ionos.com \
--cc=leon@kernel.org \
--cc=linux-block@vger.kernel.org \
--cc=linux-kernel@vger.kernel.org \
--cc=linux-rdma@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox