* [Cluster-devel] [PATCH dlm-next 02/13] fs: dlm: remove unused processed_nodes
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 03/13] fs: dlm: debugfs for queued callbacks Alexander Aring
` (10 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
The variable processed_nodes is not being used by commit 1696c75f1864
("fs: dlm: add send ack threshold and append acks to msgs"). This patch
removes the leftover of this commit.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 1 -
1 file changed, 1 deletion(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 9f14ea9f6322..f7bc22e74db2 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -863,7 +863,6 @@ struct dlm_processed_nodes {
static void process_dlm_messages(struct work_struct *work)
{
struct processqueue_entry *pentry;
- LIST_HEAD(processed_nodes);
spin_lock(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 03/13] fs: dlm: debugfs for queued callbacks
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 02/13] fs: dlm: remove unused processed_nodes Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 04/13] fs: dlm: check on plock ops when exit dlm Alexander Aring
` (9 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
It was useful to debug an issue with the callback queue to check if any
callbacks in any lkb are for some reason not processed by the callback
workqueue. The mentioned issue was fixed by commit a034c1370ded ("fs:
dlm: fix DLM_IFL_CB_PENDING gets overwritten"). If there are similar
issue that looks like a ast callback was not processed, we can confirm
now that it is not sitting to be processed by the callback workqueue
anymore.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/debug_fs.c | 101 +++++++++++++++++++++++++++++++++++++++++-
fs/dlm/dlm_internal.h | 1 +
2 files changed, 101 insertions(+), 1 deletion(-)
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index a1aca41c49d0..5aabcb6f0f15 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -18,6 +18,7 @@
#include "dlm_internal.h"
#include "midcomms.h"
#include "lock.h"
+#include "ast.h"
#define DLM_DEBUG_BUF_LEN 4096
static char debug_buf[DLM_DEBUG_BUF_LEN];
@@ -365,6 +366,52 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s)
unlock_rsb(r);
}
+static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
+{
+ struct dlm_callback *cb;
+
+ /* lkb_id lkb_flags mode flags sb_status sb_flags */
+
+ spin_lock(&lkb->lkb_cb_lock);
+ list_for_each_entry(cb, &lkb->lkb_callbacks, list) {
+ seq_printf(s, "%x %x %d %x %d %x\n",
+ lkb->lkb_id,
+ dlm_iflags_val(lkb),
+ cb->mode,
+ cb->flags,
+ cb->sb_status,
+ cb->sb_flags);
+ }
+ spin_unlock(&lkb->lkb_cb_lock);
+}
+
+static void print_format5(struct dlm_rsb *r, struct seq_file *s)
+{
+ struct dlm_lkb *lkb;
+
+ lock_rsb(r);
+
+ list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
+ print_format5_lock(s, lkb);
+ if (seq_has_overflowed(s))
+ goto out;
+ }
+
+ list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
+ print_format5_lock(s, lkb);
+ if (seq_has_overflowed(s))
+ goto out;
+ }
+
+ list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) {
+ print_format5_lock(s, lkb);
+ if (seq_has_overflowed(s))
+ goto out;
+ }
+ out:
+ unlock_rsb(r);
+}
+
struct rsbtbl_iter {
struct dlm_rsb *rsb;
unsigned bucket;
@@ -408,6 +455,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
}
print_format4(ri->rsb, seq);
break;
+ case 5:
+ if (ri->header) {
+ seq_puts(seq, "lkb_id lkb_flags mode flags sb_status sb_flags\n");
+ ri->header = 0;
+ }
+ print_format5(ri->rsb, seq);
+ break;
}
return 0;
@@ -417,6 +471,7 @@ static const struct seq_operations format1_seq_ops;
static const struct seq_operations format2_seq_ops;
static const struct seq_operations format3_seq_ops;
static const struct seq_operations format4_seq_ops;
+static const struct seq_operations format5_seq_ops;
static void *table_seq_start(struct seq_file *seq, loff_t *pos)
{
@@ -448,6 +503,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
ri->format = 3;
if (seq->op == &format4_seq_ops)
ri->format = 4;
+ if (seq->op == &format5_seq_ops)
+ ri->format = 5;
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
@@ -602,10 +659,18 @@ static const struct seq_operations format4_seq_ops = {
.show = table_seq_show,
};
+static const struct seq_operations format5_seq_ops = {
+ .start = table_seq_start,
+ .next = table_seq_next,
+ .stop = table_seq_stop,
+ .show = table_seq_show,
+};
+
static const struct file_operations format1_fops;
static const struct file_operations format2_fops;
static const struct file_operations format3_fops;
static const struct file_operations format4_fops;
+static const struct file_operations format5_fops;
static int table_open1(struct inode *inode, struct file *file)
{
@@ -683,7 +748,21 @@ static int table_open4(struct inode *inode, struct file *file)
struct seq_file *seq;
int ret;
- ret = seq_open(file, &format4_seq_ops);
+ ret = seq_open(file, &format5_seq_ops);
+ if (ret)
+ return ret;
+
+ seq = file->private_data;
+ seq->private = inode->i_private; /* the dlm_ls */
+ return 0;
+}
+
+static int table_open5(struct inode *inode, struct file *file)
+{
+ struct seq_file *seq;
+ int ret;
+
+ ret = seq_open(file, &format5_seq_ops);
if (ret)
return ret;
@@ -725,6 +804,14 @@ static const struct file_operations format4_fops = {
.release = seq_release
};
+static const struct file_operations format5_fops = {
+ .owner = THIS_MODULE,
+ .open = table_open5,
+ .read = seq_read,
+ .llseek = seq_lseek,
+ .release = seq_release
+};
+
/*
* dump lkb's on the ls_waiters list
*/
@@ -793,6 +880,7 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
debugfs_remove(ls->ls_debug_locks_dentry);
debugfs_remove(ls->ls_debug_all_dentry);
debugfs_remove(ls->ls_debug_toss_dentry);
+ debugfs_remove(ls->ls_debug_queued_asts_dentry);
}
static int dlm_state_show(struct seq_file *file, void *offset)
@@ -936,6 +1024,17 @@ void dlm_create_debug_file(struct dlm_ls *ls)
dlm_root,
ls,
&waiters_fops);
+
+ /* format 5 */
+
+ memset(name, 0, sizeof(name));
+ snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_queued_asts", ls->ls_name);
+
+ ls->ls_debug_queued_asts_dentry = debugfs_create_file(name,
+ 0644,
+ dlm_root,
+ ls,
+ &format5_fops);
}
void __init dlm_register_debugfs(void)
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index c8156770205e..dfc444dad329 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -598,6 +598,7 @@ struct dlm_ls {
struct dentry *ls_debug_locks_dentry; /* debugfs */
struct dentry *ls_debug_all_dentry; /* debugfs */
struct dentry *ls_debug_toss_dentry; /* debugfs */
+ struct dentry *ls_debug_queued_asts_dentry; /* debugfs */
wait_queue_head_t ls_uevent_wait; /* user part of join/leave */
int ls_uevent_result;
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 04/13] fs: dlm: check on plock ops when exit dlm
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 02/13] fs: dlm: remove unused processed_nodes Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 03/13] fs: dlm: debugfs for queued callbacks Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 05/13] fs: dlm: add plock dev tracepoints Alexander Aring
` (8 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
To be sure we don't have any issues that there are leftover plock ops in
either send_list or recv_list we simple check if either one of the list
are empty when we exit the dlm subsystem.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/plock.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c
index 44b3aab5b709..5c2cc8d940ef 100644
--- a/fs/dlm/plock.c
+++ b/fs/dlm/plock.c
@@ -628,5 +628,7 @@ int dlm_plock_init(void)
void dlm_plock_exit(void)
{
misc_deregister(&plock_dev_misc);
+ WARN_ON(!list_empty(&send_list));
+ WARN_ON(!list_empty(&recv_list));
}
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 05/13] fs: dlm: add plock dev tracepoints
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (2 preceding siblings ...)
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 04/13] fs: dlm: check on plock ops when exit dlm Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 06/13] fs: dlm: remove clear_members_cb Alexander Aring
` (7 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
I currently debug nfs plock handling and introduce those two tracepoints
for getting more information about what is happening there if the user
space reads plock operations from kernel and writing the result back.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/plock.c | 6 +++++
include/trace/events/dlm.h | 51 ++++++++++++++++++++++++++++++++++++++
2 files changed, 57 insertions(+)
diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c
index 5c2cc8d940ef..00e1d802a81c 100644
--- a/fs/dlm/plock.c
+++ b/fs/dlm/plock.c
@@ -11,6 +11,8 @@
#include <linux/dlm_plock.h>
#include <linux/slab.h>
+#include <trace/events/dlm.h>
+
#include "dlm_internal.h"
#include "lockspace.h"
@@ -509,6 +511,8 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count,
if (!op)
return -EAGAIN;
+ trace_dlm_plock_read(&info);
+
/* there is no need to get a reply from userspace for unlocks
that were generated by the vfs cleaning up for a close
(the process did not make an unlock call). */
@@ -536,6 +540,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
if (copy_from_user(&info, u, sizeof(info)))
return -EFAULT;
+ trace_dlm_plock_write(&info);
+
if (check_version(&info))
return -EINVAL;
diff --git a/include/trace/events/dlm.h b/include/trace/events/dlm.h
index 2b09574e1243..c1a146f9fc91 100644
--- a/include/trace/events/dlm.h
+++ b/include/trace/events/dlm.h
@@ -7,6 +7,7 @@
#include <linux/dlm.h>
#include <linux/dlmconstants.h>
+#include <uapi/linux/dlm_plock.h>
#include <linux/tracepoint.h>
#include "../../../fs/dlm/dlm_internal.h"
@@ -585,6 +586,56 @@ TRACE_EVENT(dlm_recv_message,
);
+DECLARE_EVENT_CLASS(dlm_plock_template,
+
+ TP_PROTO(const struct dlm_plock_info *info),
+
+ TP_ARGS(info),
+
+ TP_STRUCT__entry(
+ __field(uint8_t, optype)
+ __field(uint8_t, ex)
+ __field(uint8_t, wait)
+ __field(uint8_t, flags)
+ __field(uint32_t, pid)
+ __field(int32_t, nodeid)
+ __field(int32_t, rv)
+ __field(uint32_t, fsid)
+ __field(uint64_t, number)
+ __field(uint64_t, start)
+ __field(uint64_t, end)
+ __field(uint64_t, owner)
+ ),
+
+ TP_fast_assign(
+ __entry->optype = info->optype;
+ __entry->ex = info->ex;
+ __entry->wait = info->wait;
+ __entry->flags = info->flags;
+ __entry->pid = info->pid;
+ __entry->nodeid = info->nodeid;
+ __entry->rv = info->rv;
+ __entry->fsid = info->fsid;
+ __entry->number = info->number;
+ __entry->start = info->start;
+ __entry->end = info->end;
+ __entry->owner = info->owner;
+ ),
+
+ TP_printk("fsid=%u number=%llx owner=%llx optype=%d ex=%d wait=%d flags=%x pid=%u nodeid=%d rv=%d start=%llx end=%llx",
+ __entry->fsid, __entry->number, __entry->owner,
+ __entry->optype, __entry->ex, __entry->wait,
+ __entry->flags, __entry->pid, __entry->nodeid,
+ __entry->rv, __entry->start, __entry->end)
+
+);
+
+DEFINE_EVENT(dlm_plock_template, dlm_plock_read,
+ TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info));
+
+DEFINE_EVENT(dlm_plock_template, dlm_plock_write,
+ TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info));
+
TRACE_EVENT(dlm_send,
TP_PROTO(int nodeid, int ret),
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 06/13] fs: dlm: remove clear_members_cb
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (3 preceding siblings ...)
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 05/13] fs: dlm: add plock dev tracepoints Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 07/13] fs: dlm: cleanup lock order Alexander Aring
` (6 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch is just a small cleanup to directly call
remove_remote_member() instead of going over clear_members_cb() which
just calls remove_remote_member().
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/member.c | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 77d202e4a02a..f303ea8bd256 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -393,14 +393,9 @@ static void remove_remote_member(int nodeid)
dlm_midcomms_remove_member(nodeid);
}
-static void clear_members_cb(int nodeid)
-{
- remove_remote_member(nodeid);
-}
-
void dlm_clear_members(struct dlm_ls *ls)
{
- clear_memb_list(&ls->ls_nodes, clear_members_cb);
+ clear_memb_list(&ls->ls_nodes, remove_remote_member);
ls->ls_num_nodes = 0;
}
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 07/13] fs: dlm: cleanup lock order
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (4 preceding siblings ...)
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 06/13] fs: dlm: remove clear_members_cb Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 08/13] fs: dlm: get recovery sequence number as parameter Alexander Aring
` (5 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch cleanups the lock order to hold at first the close_lock and
then held the nodes_srcu read lock. Probably it will never be a problem
as nodes_srcu is only a read lock preventing the node pointer getting
freed.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/midcomms.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index e1a0df67b566..8ebffbfdc00a 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -1489,12 +1489,12 @@ int dlm_midcomms_close(int nodeid)
synchronize_srcu(&nodes_srcu);
- idx = srcu_read_lock(&nodes_srcu);
mutex_lock(&close_lock);
+ idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
- mutex_unlock(&close_lock);
srcu_read_unlock(&nodes_srcu, idx);
+ mutex_unlock(&close_lock);
return dlm_lowcomms_close(nodeid);
}
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 08/13] fs: dlm: get recovery sequence number as parameter
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (5 preceding siblings ...)
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 07/13] fs: dlm: cleanup lock order Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 09/13] fs: dlm: drop rxbuf manipulation in dlm_copy_master_names Alexander Aring
` (4 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch removes a read of the ls->ls_recover_seq uint64_t number in
_create_rcom(). If the ls->ls_recover_seq is readed the ls_recover_lock
need to held. However this number was always readed before when any rcom
message is received and it's not necessary to read it again from a per
lockspace variable to use it for the replying message. This patch will
pass the sequence number as parameter so another read of ls->ls_recover_seq
and holding the ls->ls_recover_lock is not required.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dir.c | 4 +--
fs/dlm/dir.h | 2 +-
fs/dlm/lock.c | 5 ++--
fs/dlm/lock.h | 3 ++-
fs/dlm/member.c | 6 ++---
fs/dlm/rcom.c | 68 ++++++++++++++++++++++++++---------------------
fs/dlm/rcom.h | 10 ++++---
fs/dlm/recover.c | 58 +++++++++++++++++++++-------------------
fs/dlm/recover.h | 12 ++++-----
fs/dlm/recoverd.c | 16 +++++------
10 files changed, 99 insertions(+), 85 deletions(-)
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index fb1981654bb2..3bf5bf7a37b4 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -58,7 +58,7 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls)
up_read(&ls->ls_root_sem);
}
-int dlm_recover_directory(struct dlm_ls *ls)
+int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
char *b, *last_name = NULL;
@@ -90,7 +90,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
}
error = dlm_rcom_names(ls, memb->nodeid,
- last_name, last_len);
+ last_name, last_len, seq);
if (error)
goto out_free;
diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h
index 03844d086be2..0635582da003 100644
--- a/fs/dlm/dir.h
+++ b/fs/dlm/dir.h
@@ -15,7 +15,7 @@
int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_recover_dir_nodeid(struct dlm_ls *ls);
-int dlm_recover_directory(struct dlm_ls *ls);
+int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq);
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid);
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f511a9d7d416..b489da38e685 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5464,7 +5464,8 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
}
/* needs at least dlm_rcom + rcom_lock */
-int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
+int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+ uint64_t seq)
{
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r;
@@ -5509,7 +5510,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
result);
- dlm_send_rcom_lock(r, lkb);
+ dlm_send_rcom_lock(r, lkb, seq);
goto out;
case -EEXIST:
case 0:
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index aa5ad44d902b..222e682523b9 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -37,7 +37,8 @@ void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
-int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
+int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+ uint64_t seq);
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
uint32_t flags, void *name, unsigned int namelen);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index f303ea8bd256..19f3cd96f3c0 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -449,7 +449,7 @@ static void make_member_array(struct dlm_ls *ls)
/* send a status request to all members just to establish comms connections */
-static int ping_members(struct dlm_ls *ls)
+static int ping_members(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
int error = 0;
@@ -459,7 +459,7 @@ static int ping_members(struct dlm_ls *ls)
error = -EINTR;
break;
}
- error = dlm_rcom_status(ls, memb->nodeid, 0);
+ error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
if (error)
break;
}
@@ -607,7 +607,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
make_member_array(ls);
*neg_out = neg;
- error = ping_members(ls);
+ error = ping_members(ls, rv->seq);
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
return error;
}
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f4afdf892f78..efe45e68287f 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -28,7 +28,8 @@ static int rcom_response(struct dlm_ls *ls)
}
static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
- struct dlm_rcom **rc_ret, char *mb, int mb_len)
+ struct dlm_rcom **rc_ret, char *mb, int mb_len,
+ uint64_t seq)
{
struct dlm_rcom *rc;
@@ -41,16 +42,14 @@ static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
rc->rc_header.h_cmd = DLM_RCOM;
rc->rc_type = cpu_to_le32(type);
-
- spin_lock(&ls->ls_recover_lock);
- rc->rc_seq = cpu_to_le64(ls->ls_recover_seq);
- spin_unlock(&ls->ls_recover_lock);
+ rc->rc_seq = cpu_to_le64(seq);
*rc_ret = rc;
}
static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
- struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
+ struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret,
+ uint64_t seq)
{
int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_mhandle *mh;
@@ -63,14 +62,14 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
return -ENOBUFS;
}
- _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
+ _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
*mh_ret = mh;
return 0;
}
static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
int len, struct dlm_rcom **rc_ret,
- struct dlm_msg **msg_ret)
+ struct dlm_msg **msg_ret, uint64_t seq)
{
int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_msg *msg;
@@ -84,7 +83,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
return -ENOBUFS;
}
- _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
+ _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
*msg_ret = msg;
return 0;
}
@@ -170,7 +169,8 @@ static void disallow_sync_reply(struct dlm_ls *ls)
* node's rcom_config.
*/
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
+ uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_msg *msg;
@@ -186,7 +186,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
retry:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
- sizeof(struct rcom_status), &rc, &msg);
+ sizeof(struct rcom_status), &rc, &msg,
+ seq);
if (error)
goto out;
@@ -220,7 +221,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
return error;
}
-static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in,
+ uint64_t seq)
{
struct dlm_rcom *rc;
struct rcom_status *rs;
@@ -251,7 +253,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
do_create:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
- len, &rc, &msg);
+ len, &rc, &msg, seq);
if (error)
return;
@@ -302,7 +304,8 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_rcom_spin);
}
-int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
+int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
+ int last_len, uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_msg *msg;
@@ -312,7 +315,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
retry:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
- &rc, &msg);
+ &rc, &msg, seq);
if (error)
goto out;
memcpy(rc->rc_buf, last_name, last_len);
@@ -330,7 +333,8 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
return error;
}
-static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in,
+ uint64_t seq)
{
struct dlm_rcom *rc;
int error, inlen, outlen, nodeid;
@@ -342,7 +346,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
- &rc, &msg);
+ &rc, &msg, seq);
if (error)
return;
rc->rc_id = rc_in->rc_id;
@@ -353,7 +357,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
send_rcom_stateless(msg, rc);
}
-int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
+int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@@ -361,7 +365,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
int error;
error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
- &rc, &mh);
+ &rc, &mh, seq);
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
@@ -372,7 +376,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
return error;
}
-static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in,
+ uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@@ -387,7 +392,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
return;
}
- error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
+ error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh,
+ seq);
if (error)
return;
@@ -437,7 +443,7 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
}
-int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
{
struct dlm_ls *ls = r->res_ls;
struct dlm_rcom *rc;
@@ -448,7 +454,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (lkb->lkb_lvbptr)
len += ls->ls_lvblen;
- error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
+ error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
+ seq);
if (error)
goto out;
@@ -462,7 +469,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
}
/* needs at least dlm_rcom + rcom_lock */
-static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in,
+ uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@@ -471,7 +479,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_recover_master_copy(ls, rc_in);
error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
- sizeof(struct rcom_lock), &rc, &mh);
+ sizeof(struct rcom_lock), &rc, &mh, seq);
if (error)
return;
@@ -620,21 +628,21 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
switch (rc->rc_type) {
case cpu_to_le32(DLM_RCOM_STATUS):
- receive_rcom_status(ls, rc);
+ receive_rcom_status(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_NAMES):
- receive_rcom_names(ls, rc);
+ receive_rcom_names(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_LOOKUP):
- receive_rcom_lookup(ls, rc);
+ receive_rcom_lookup(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_LOCK):
if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
goto Eshort;
- receive_rcom_lock(ls, rc);
+ receive_rcom_lock(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
@@ -652,7 +660,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
goto Eshort;
- dlm_recover_process_copy(ls, rc);
+ dlm_recover_process_copy(ls, rc, seq);
break;
default:
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index 454d3c4814ab..9dd06d43ddb4 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -12,10 +12,12 @@
#ifndef __RCOM_DOT_H__
#define __RCOM_DOT_H__
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
-int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
-int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
-int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
+ uint64_t seq);
+int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
+ int last_len, uint64_t seq);
+int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq);
+int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq);
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 29d71a5018d4..ddb6b3312cc1 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -93,7 +93,7 @@ void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
}
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
- int save_slots)
+ int save_slots, uint64_t seq)
{
struct dlm_rcom *rc = ls->ls_recover_buf;
struct dlm_member *memb;
@@ -107,7 +107,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
goto out;
}
- error = dlm_rcom_status(ls, memb->nodeid, 0);
+ error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
if (error)
goto out;
@@ -126,7 +126,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
}
static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
- uint32_t status_flags)
+ uint32_t status_flags, uint64_t seq)
{
struct dlm_rcom *rc = ls->ls_recover_buf;
int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -137,7 +137,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
goto out;
}
- error = dlm_rcom_status(ls, nodeid, status_flags);
+ error = dlm_rcom_status(ls, nodeid, status_flags, seq);
if (error)
break;
@@ -151,22 +151,22 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
return error;
}
-static int wait_status(struct dlm_ls *ls, uint32_t status)
+static int wait_status(struct dlm_ls *ls, uint32_t status, uint64_t seq)
{
uint32_t status_all = status << 1;
int error;
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
- error = wait_status_all(ls, status, 0);
+ error = wait_status_all(ls, status, 0, seq);
if (!error)
dlm_set_recover_status(ls, status_all);
} else
- error = wait_status_low(ls, status_all, 0);
+ error = wait_status_low(ls, status_all, 0, seq);
return error;
}
-int dlm_recover_members_wait(struct dlm_ls *ls)
+int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
struct dlm_slot *slots;
@@ -180,7 +180,7 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
}
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
- error = wait_status_all(ls, DLM_RS_NODES, 1);
+ error = wait_status_all(ls, DLM_RS_NODES, 1, seq);
if (error)
goto out;
@@ -199,7 +199,8 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
}
} else {
- error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
+ error = wait_status_low(ls, DLM_RS_NODES_ALL,
+ DLM_RSF_NEED_SLOTS, seq);
if (error)
goto out;
@@ -209,19 +210,19 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
return error;
}
-int dlm_recover_directory_wait(struct dlm_ls *ls)
+int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq)
{
- return wait_status(ls, DLM_RS_DIR);
+ return wait_status(ls, DLM_RS_DIR, seq);
}
-int dlm_recover_locks_wait(struct dlm_ls *ls)
+int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq)
{
- return wait_status(ls, DLM_RS_LOCKS);
+ return wait_status(ls, DLM_RS_LOCKS, seq);
}
-int dlm_recover_done_wait(struct dlm_ls *ls)
+int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq)
{
- return wait_status(ls, DLM_RS_DONE);
+ return wait_status(ls, DLM_RS_DONE, seq);
}
/*
@@ -441,7 +442,7 @@ static void set_new_master(struct dlm_rsb *r)
* equals our_nodeid below).
*/
-static int recover_master(struct dlm_rsb *r, unsigned int *count)
+static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
{
struct dlm_ls *ls = r->res_ls;
int our_nodeid, dir_nodeid;
@@ -472,7 +473,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count)
error = 0;
} else {
recover_idr_add(r);
- error = dlm_send_rcom_lookup(r, dir_nodeid);
+ error = dlm_send_rcom_lookup(r, dir_nodeid, seq);
}
(*count)++;
@@ -520,7 +521,7 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
* the correct dir node.
*/
-int dlm_recover_masters(struct dlm_ls *ls)
+int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_rsb *r;
unsigned int total = 0;
@@ -542,7 +543,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
if (nodir)
error = recover_master_static(r, &count);
else
- error = recover_master(r, &count);
+ error = recover_master(r, &count, seq);
unlock_rsb(r);
cond_resched();
total++;
@@ -614,13 +615,14 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
* an equal number of replies then recovery for the rsb is done
*/
-static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
+static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head,
+ uint64_t seq)
{
struct dlm_lkb *lkb;
int error = 0;
list_for_each_entry(lkb, head, lkb_statequeue) {
- error = dlm_send_rcom_lock(r, lkb);
+ error = dlm_send_rcom_lock(r, lkb, seq);
if (error)
break;
r->res_recover_locks_count++;
@@ -629,7 +631,7 @@ static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
return error;
}
-static int recover_locks(struct dlm_rsb *r)
+static int recover_locks(struct dlm_rsb *r, uint64_t seq)
{
int error = 0;
@@ -637,13 +639,13 @@ static int recover_locks(struct dlm_rsb *r)
DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
- error = recover_locks_queue(r, &r->res_grantqueue);
+ error = recover_locks_queue(r, &r->res_grantqueue, seq);
if (error)
goto out;
- error = recover_locks_queue(r, &r->res_convertqueue);
+ error = recover_locks_queue(r, &r->res_convertqueue, seq);
if (error)
goto out;
- error = recover_locks_queue(r, &r->res_waitqueue);
+ error = recover_locks_queue(r, &r->res_waitqueue, seq);
if (error)
goto out;
@@ -656,7 +658,7 @@ static int recover_locks(struct dlm_rsb *r)
return error;
}
-int dlm_recover_locks(struct dlm_ls *ls)
+int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_rsb *r;
int error, count = 0;
@@ -677,7 +679,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
goto out;
}
- error = recover_locks(r);
+ error = recover_locks(r, seq);
if (error) {
up_read(&ls->ls_root_sem);
goto out;
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index 235e0d25cd48..c5ce2ef13934 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -15,13 +15,13 @@
int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls));
uint32_t dlm_recover_status(struct dlm_ls *ls);
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status);
-int dlm_recover_members_wait(struct dlm_ls *ls);
-int dlm_recover_directory_wait(struct dlm_ls *ls);
-int dlm_recover_locks_wait(struct dlm_ls *ls);
-int dlm_recover_done_wait(struct dlm_ls *ls);
-int dlm_recover_masters(struct dlm_ls *ls);
+int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc);
-int dlm_recover_locks(struct dlm_ls *ls);
+int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
void dlm_recovered_lock(struct dlm_rsb *r);
int dlm_create_root_list(struct dlm_ls *ls);
void dlm_release_root_list(struct dlm_ls *ls);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 19da816cfb09..4d17491dea2f 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -90,7 +90,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_NODES);
- error = dlm_recover_members_wait(ls);
+ error = dlm_recover_members_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_members_wait error %d", error);
goto fail;
@@ -103,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* nodes their master rsb names that hash to us.
*/
- error = dlm_recover_directory(ls);
+ error = dlm_recover_directory(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory error %d", error);
goto fail;
@@ -111,7 +111,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_DIR);
- error = dlm_recover_directory_wait(ls);
+ error = dlm_recover_directory_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
goto fail;
@@ -145,7 +145,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* departed nodes.
*/
- error = dlm_recover_masters(ls);
+ error = dlm_recover_masters(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_masters error %d", error);
goto fail;
@@ -155,7 +155,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* Send our locks on remastered rsb's to the new masters.
*/
- error = dlm_recover_locks(ls);
+ error = dlm_recover_locks(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks error %d", error);
goto fail;
@@ -163,7 +163,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_LOCKS);
- error = dlm_recover_locks_wait(ls);
+ error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
@@ -187,7 +187,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
*/
dlm_set_recover_status(ls, DLM_RS_LOCKS);
- error = dlm_recover_locks_wait(ls);
+ error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
@@ -206,7 +206,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_DONE);
- error = dlm_recover_done_wait(ls);
+ error = dlm_recover_done_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_done_wait error %d", error);
goto fail;
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 09/13] fs: dlm: drop rxbuf manipulation in dlm_copy_master_names
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (6 preceding siblings ...)
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 08/13] fs: dlm: get recovery sequence number as parameter Alexander Aring
@ 2023-07-27 13:22 ` Alexander Aring
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 10/13] fs: dlm: drop rxbuf manipulation in dlm_recover_master_copy Alexander Aring
` (3 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:22 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch removes the manipulation of the receive buffer in case of an
error and be sure the buffer is null terminated before an error
messagea is printed out. Instead of manipulate the receive buffer we
tell inside the format string the maximum length the string buffer is
being read.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dir.c | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 3bf5bf7a37b4..768cf8d43b2b 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -245,9 +245,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
if (inlen > 1) {
r = find_rsb_root(ls, inbuf, inlen);
if (!r) {
- inbuf[inlen - 1] = '\0';
- log_error(ls, "copy_master_names from %d start %d %s",
- nodeid, inlen, inbuf);
+ log_error(ls, "copy_master_names from %d start %d %.*s",
+ nodeid, inlen, inlen, inbuf);
goto out;
}
list = r->res_root_list.next;
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 10/13] fs: dlm: drop rxbuf manipulation in dlm_recover_master_copy
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (7 preceding siblings ...)
2023-07-27 13:22 ` [Cluster-devel] [PATCH dlm-next 09/13] fs: dlm: drop rxbuf manipulation in dlm_copy_master_names Alexander Aring
@ 2023-07-27 13:23 ` Alexander Aring
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 11/13] fs: dlm: constify receive buffer Alexander Aring
` (2 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:23 UTC (permalink / raw)
To: cluster-devel.redhat.com
Currently dlm_recover_master_copy() manipulates the receive buffer of an
rcom lock message and modifies it on the fly so a later memcpy() to a
new rcom message with the same message has those new values. This patch
avoids manipulating the received rcom message by store the values for
the new rcom message in paremter assigned with call by reference. Later
when dlm_send_rcom_lock() constructs a new message and memcpy() the
receive buffer those values will be set on the new constructed message.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 10 +++++++---
fs/dlm/lock.h | 3 ++-
fs/dlm/rcom.c | 12 ++++++++----
3 files changed, 17 insertions(+), 8 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index b489da38e685..1cf666c7401d 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5384,7 +5384,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
back the rcom_lock struct we got but with the remid field filled in. */
/* needs at least dlm_rcom + rcom_lock */
-int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
+int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+ __le32 *rl_remid, __le32 *rl_result)
{
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r;
@@ -5393,6 +5394,9 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
int error;
+ /* init rl_remid with rcom lock rl_remid */
+ *rl_remid = rl->rl_remid;
+
if (rl->rl_parent_lkid) {
error = -EOPNOTSUPP;
goto out;
@@ -5448,7 +5452,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
out_remid:
/* this is the new value returned to the lock holder for
saving in its process-copy lkb */
- rl->rl_remid = cpu_to_le32(lkb->lkb_id);
+ *rl_remid = cpu_to_le32(lkb->lkb_id);
lkb->lkb_recover_seq = ls->ls_recover_seq;
@@ -5459,7 +5463,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
if (error && error != -EEXIST)
log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
from_nodeid, remid, error);
- rl->rl_result = cpu_to_le32(error);
+ *rl_result = cpu_to_le32(error);
return error;
}
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 222e682523b9..cd67ccfbbf9b 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -36,7 +36,8 @@ void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls);
-int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
+int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+ __le32 *rl_remid, __le32 *rl_result);
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
uint64_t seq);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index efe45e68287f..0946431e370a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -472,21 +472,25 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in,
uint64_t seq)
{
+ __le32 rl_remid, rl_result;
+ struct rcom_lock *rl;
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
- dlm_recover_master_copy(ls, rc_in);
+ dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result);
error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
sizeof(struct rcom_lock), &rc, &mh, seq);
if (error)
return;
- /* We send back the same rcom_lock struct we received, but
- dlm_recover_master_copy() has filled in rl_remid and rl_result */
-
memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
+ rl = (struct rcom_lock *)rc->rc_buf;
+ /* set rl_remid and rl_result from dlm_recover_master_copy() */
+ rl->rl_remid = rl_remid;
+ rl->rl_result = rl_result;
+
rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq;
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 11/13] fs: dlm: constify receive buffer
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (8 preceding siblings ...)
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 10/13] fs: dlm: drop rxbuf manipulation in dlm_recover_master_copy Alexander Aring
@ 2023-07-27 13:23 ` Alexander Aring
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 12/13] fs: dlm: create midcomms nodes when configure Alexander Aring
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 13/13] fs: dlm: don't use RCOM_NAMES for version detection Alexander Aring
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:23 UTC (permalink / raw)
To: cluster-devel.redhat.com
The dlm receive buffer should be never manipulated as DLM is the last
instance of parsing layer. This patch constify the whole receive buffer
so we are sure it never gets manipulated when it's being parsed.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dir.c | 5 +-
fs/dlm/dir.h | 4 +-
fs/dlm/lock.c | 109 ++++++++++++++++++++++--------------------
fs/dlm/lock.h | 14 +++---
fs/dlm/member.c | 2 +-
fs/dlm/member.h | 2 +-
fs/dlm/midcomms.c | 16 ++++---
fs/dlm/rcom.c | 20 ++++----
fs/dlm/rcom.h | 5 +-
fs/dlm/recover.c | 2 +-
fs/dlm/recover.h | 2 +-
fs/dlm/requestqueue.c | 3 +-
fs/dlm/requestqueue.h | 3 +-
13 files changed, 101 insertions(+), 86 deletions(-)
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 768cf8d43b2b..f6acba4310a7 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -196,7 +196,8 @@ int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
return error;
}
-static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
+static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
+ int len)
{
struct dlm_rsb *r;
uint32_t hash, bucket;
@@ -232,7 +233,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
for rsb's we're master of and whose directory node matches the requesting
node. inbuf is the rsb name last sent, inlen is the name's length */
-void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
+void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid)
{
struct list_head *list;
diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h
index 0635582da003..39ecb69d7ef3 100644
--- a/fs/dlm/dir.h
+++ b/fs/dlm/dir.h
@@ -16,8 +16,8 @@ int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_recover_dir_nodeid(struct dlm_ls *ls);
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq);
-void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
- char *outbuf, int outlen, int nodeid);
+void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
+ char *outbuf, int outlen, int nodeid);
#endif /* __DIR_DOT_H__ */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 1cf666c7401d..652c51fbbf76 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -86,8 +86,8 @@ static int send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
- struct dlm_message *ms, bool local);
-static int receive_extralen(struct dlm_message *ms);
+ const struct dlm_message *ms, bool local);
+static int receive_extralen(const struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void toss_rsb(struct kref *kref);
@@ -984,8 +984,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
* . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
*/
-int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
- unsigned int flags, int *r_nodeid, int *result)
+int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+ int len, unsigned int flags, int *r_nodeid, int *result)
{
struct dlm_rsb *r = NULL;
uint32_t hash, b;
@@ -1106,7 +1106,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
}
}
-void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
+void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
{
struct dlm_rsb *r = NULL;
uint32_t hash, b;
@@ -1459,7 +1459,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
set RESEND and dlm_recover_waiters_post() */
static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
- struct dlm_message *ms)
+ const struct dlm_message *ms)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int overlap_done = 0;
@@ -1557,8 +1557,8 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
/* Handles situations where we might be processing a "fake" or "local" reply in
which we can't try to take waiters_mutex again. */
-static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms,
- bool local)
+static int remove_from_waiters_ms(struct dlm_lkb *lkb,
+ const struct dlm_message *ms, bool local)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
@@ -1800,7 +1800,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* lkb is process copy (pc) */
static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
- struct dlm_message *ms)
+ const struct dlm_message *ms)
{
int b;
@@ -1907,7 +1907,7 @@ static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
}
static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
- struct dlm_message *ms)
+ const struct dlm_message *ms)
{
set_lvb_lock_pc(r, lkb, ms);
_grant_lock(r, lkb);
@@ -1945,7 +1945,7 @@ static void munge_demoted(struct dlm_lkb *lkb)
lkb->lkb_grmode = DLM_LOCK_NL;
}
-static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
{
if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
@@ -3641,8 +3641,9 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
}
-static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
- int ret_nodeid, int rv)
+static int send_lookup_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms_in, int ret_nodeid,
+ int rv)
{
struct dlm_rsb *r = &ls->ls_local_rsb;
struct dlm_message *ms;
@@ -3667,14 +3668,15 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
of message, unlike the send side where we can safely send everything about
the lkb for any type of message */
-static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
{
lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
}
-static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
+static void receive_flags_reply(struct dlm_lkb *lkb,
+ const struct dlm_message *ms,
bool local)
{
if (local)
@@ -3684,14 +3686,14 @@ static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
}
-static int receive_extralen(struct dlm_message *ms)
+static int receive_extralen(const struct dlm_message *ms)
{
return (le16_to_cpu(ms->m_header.h_length) -
sizeof(struct dlm_message));
}
static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms)
+ const struct dlm_message *ms)
{
int len;
@@ -3719,7 +3721,7 @@ static void fake_astfn(void *astparam)
}
static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms)
+ const struct dlm_message *ms)
{
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
@@ -3741,7 +3743,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
}
static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms)
+ const struct dlm_message *ms)
{
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
return -EBUSY;
@@ -3756,7 +3758,7 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
}
static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms)
+ const struct dlm_message *ms)
{
if (receive_lvb(ls, lkb, ms))
return -ENOMEM;
@@ -3766,7 +3768,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
/* We fill in the local-lkb fields with the info that send_xxxx_reply()
uses to send a reply and that the remote end uses to process the reply. */
-static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
+static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb = &ls->ls_local_lkb;
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
@@ -3776,7 +3778,7 @@ static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
/* This is called after the rsb is locked so that we can safely inspect
fields in the lkb. */
-static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
+static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
{
int from = le32_to_cpu(ms->m_header.h_nodeid);
int error = 0;
@@ -3828,7 +3830,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
-static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3907,7 +3909,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
return error;
}
-static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3963,7 +3965,7 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
return error;
}
-static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -4015,7 +4017,7 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
return error;
}
-static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -4051,7 +4053,7 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
return error;
}
-static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -4082,7 +4084,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
}
-static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -4110,7 +4112,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
}
-static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
+static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
{
int len, error, ret_nodeid, from_nodeid, our_nodeid;
@@ -4130,7 +4132,7 @@ static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
send_lookup_reply(ls, ms, ret_nodeid, error);
}
-static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
+static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
{
char name[DLM_RESNAME_MAXLEN+1];
struct dlm_rsb *r;
@@ -4218,12 +4220,13 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
}
}
-static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
+static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
{
do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
}
-static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -4345,7 +4348,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
}
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
- struct dlm_message *ms, bool local)
+ const struct dlm_message *ms, bool local)
{
/* this is the value returned from do_convert() on the master */
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
@@ -4388,8 +4391,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
}
-static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
- bool local)
+static void _receive_convert_reply(struct dlm_lkb *lkb,
+ const struct dlm_message *ms, bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
@@ -4412,7 +4415,8 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
put_rsb(r);
}
-static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
@@ -4426,8 +4430,8 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
}
-static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
- bool local)
+static void _receive_unlock_reply(struct dlm_lkb *lkb,
+ const struct dlm_message *ms, bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
@@ -4463,7 +4467,8 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
put_rsb(r);
}
-static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
@@ -4477,8 +4482,8 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
}
-static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
- bool local)
+static void _receive_cancel_reply(struct dlm_lkb *lkb,
+ const struct dlm_message *ms, bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
@@ -4515,7 +4520,8 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
put_rsb(r);
}
-static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
@@ -4529,7 +4535,8 @@ static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
}
-static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static void receive_lookup_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -4608,7 +4615,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb);
}
-static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq)
{
int error = 0, noent = 0;
@@ -4744,7 +4751,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
requestqueue, to processing all the saved messages, to processing new
messages as they arrive. */
-static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
int nodeid)
{
if (dlm_locking_stopped(ls)) {
@@ -4767,7 +4774,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
/* This is called by dlm_recoverd to process messages that were saved on
the requestqueue. */
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq)
{
_receive_message(ls, ms, saved_seq);
@@ -4778,9 +4785,9 @@ void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
standard locking activity) or an RCOM (recovery message sent as part of
lockspace recovery). */
-void dlm_receive_buffer(union dlm_packet *p, int nodeid)
+void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
{
- struct dlm_header *hd = &p->header;
+ const struct dlm_header *hd = &p->header;
struct dlm_ls *ls;
int type = 0;
@@ -5334,7 +5341,7 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
/* needs at least dlm_rcom + rcom_lock */
static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_rsb *r, struct dlm_rcom *rc)
+ struct dlm_rsb *r, const struct dlm_rcom *rc)
{
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
@@ -5384,7 +5391,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
back the rcom_lock struct we got but with the remid field filled in. */
/* needs at least dlm_rcom + rcom_lock */
-int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
__le32 *rl_remid, __le32 *rl_result)
{
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
@@ -5468,7 +5475,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
}
/* needs at least dlm_rcom + rcom_lock */
-int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
uint64_t seq)
{
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index cd67ccfbbf9b..b54e2cbbe6e2 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -12,11 +12,11 @@
#define __LOCK_DOT_H__
void dlm_dump_rsb(struct dlm_rsb *r);
-void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len);
+void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len);
void dlm_print_lkb(struct dlm_lkb *lkb);
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq);
-void dlm_receive_buffer(union dlm_packet *p, int nodeid);
+void dlm_receive_buffer(const union dlm_packet *p, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
void dlm_put_rsb(struct dlm_rsb *r);
void dlm_hold_rsb(struct dlm_rsb *r);
@@ -25,8 +25,8 @@ void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls);
-int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
- unsigned int flags, int *r_nodeid, int *result);
+int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+ int len, unsigned int flags, int *r_nodeid, int *result);
int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
struct dlm_rsb **r_ret);
@@ -36,9 +36,9 @@ void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls);
-int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
__le32 *rl_remid, __le32 *rl_result);
-int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
+int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
uint64_t seq);
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 19f3cd96f3c0..be7909ead71b 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -18,7 +18,7 @@
#include "midcomms.h"
#include "lowcomms.h"
-int dlm_slots_version(struct dlm_header *h)
+int dlm_slots_version(const struct dlm_header *h)
{
if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
return 0;
diff --git a/fs/dlm/member.h b/fs/dlm/member.h
index 433b2fac9f4a..f61cfde46314 100644
--- a/fs/dlm/member.h
+++ b/fs/dlm/member.h
@@ -18,7 +18,7 @@ void dlm_clear_members_gone(struct dlm_ls *ls);
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
int dlm_is_removed(struct dlm_ls *ls, int nodeid);
int dlm_is_member(struct dlm_ls *ls, int nodeid);
-int dlm_slots_version(struct dlm_header *h);
+int dlm_slots_version(const struct dlm_header *h);
void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
struct dlm_member *memb);
void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 8ebffbfdc00a..c125496e03bf 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -499,7 +499,8 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
spin_unlock(&node->state_lock);
}
-static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p)
+static void dlm_receive_buffer_3_2_trace(uint32_t seq,
+ const union dlm_packet *p)
{
switch (p->header.h_cmd) {
case DLM_MSG:
@@ -513,7 +514,7 @@ static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p)
}
}
-static void dlm_midcomms_receive_buffer(union dlm_packet *p,
+static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
struct midcomms_node *node,
uint32_t seq)
{
@@ -708,7 +709,8 @@ static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
return 0;
}
-static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
+static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen,
+ int nodeid)
{
int len = msglen;
@@ -757,7 +759,7 @@ static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodei
return 0;
}
-static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
+static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodeid)
{
uint16_t msglen = le16_to_cpu(p->header.h_length);
struct midcomms_node *node;
@@ -878,7 +880,7 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
return 0;
}
-static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
+static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
{
uint16_t msglen = le16_to_cpu(p->header.h_length);
struct midcomms_node *node;
@@ -977,10 +979,10 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
switch (hd->h_version) {
case cpu_to_le32(DLM_VERSION_3_1):
- dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
+ dlm_midcomms_receive_buffer_3_1((const union dlm_packet *)ptr, nodeid);
break;
case cpu_to_le32(DLM_VERSION_3_2):
- dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
+ dlm_midcomms_receive_buffer_3_2((const union dlm_packet *)ptr, nodeid);
break;
default:
log_print("received invalid version header: %u from node %d, will skip this message",
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 0946431e370a..6ab029149a1d 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -221,7 +221,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
return error;
}
-static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in,
+static void receive_rcom_status(struct dlm_ls *ls,
+ const struct dlm_rcom *rc_in,
uint64_t seq)
{
struct dlm_rcom *rc;
@@ -283,7 +284,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in,
send_rcom_stateless(msg, rc);
}
-static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
{
spin_lock(&ls->ls_rcom_spin);
if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
@@ -333,7 +334,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
return error;
}
-static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in,
+static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
uint64_t seq)
{
struct dlm_rcom *rc;
@@ -376,8 +377,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
return error;
}
-static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in,
- uint64_t seq)
+static void receive_rcom_lookup(struct dlm_ls *ls,
+ const struct dlm_rcom *rc_in, uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@@ -408,7 +409,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in,
send_rcom(mh, rc);
}
-static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+static void receive_rcom_lookup_reply(struct dlm_ls *ls,
+ const struct dlm_rcom *rc_in)
{
dlm_recover_master_reply(ls, rc_in);
}
@@ -469,7 +471,7 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
}
/* needs at least dlm_rcom + rcom_lock */
-static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in,
+static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
uint64_t seq)
{
__le32 rl_remid, rl_result;
@@ -500,7 +502,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in,
/* If the lockspace doesn't exist then still send a status message
back; it's possible that it just doesn't have its global_id yet. */
-int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
+int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct rcom_config *rf;
@@ -578,7 +580,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */
-void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
+void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
{
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
int stop, reply = 0, names = 0, lookup = 0, lock = 0;
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index 9dd06d43ddb4..765926ae0020 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -18,8 +18,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
int last_len, uint64_t seq);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq);
-void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
-int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
+void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc,
+ int nodeid);
+int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in);
#endif
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index ddb6b3312cc1..53917c0aa3c0 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -564,7 +564,7 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
return error;
}
-int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
{
struct dlm_rsb *r;
int ret_nodeid, new_master;
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index c5ce2ef13934..dbc51013ecad 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -20,7 +20,7 @@ int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
-int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc);
+int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
void dlm_recovered_lock(struct dlm_rsb *r);
int dlm_create_root_list(struct dlm_ls *ls);
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 8be2893ad15b..892d6ca21e74 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -30,7 +30,8 @@ struct rq_entry {
* lockspace is enabled on some while still suspended on others.
*/
-void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
+ const struct dlm_message *ms)
{
struct rq_entry *e;
int length = le16_to_cpu(ms->m_header.h_length) -
diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h
index 4e403469a845..42bfe23ceabe 100644
--- a/fs/dlm/requestqueue.h
+++ b/fs/dlm/requestqueue.h
@@ -11,7 +11,8 @@
#ifndef __REQUESTQUEUE_DOT_H__
#define __REQUESTQUEUE_DOT_H__
-void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms);
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
+ const struct dlm_message *ms);
int dlm_process_requestqueue(struct dlm_ls *ls);
void dlm_wait_requestqueue(struct dlm_ls *ls);
void dlm_purge_requestqueue(struct dlm_ls *ls);
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 12/13] fs: dlm: create midcomms nodes when configure
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (9 preceding siblings ...)
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 11/13] fs: dlm: constify receive buffer Alexander Aring
@ 2023-07-27 13:23 ` Alexander Aring
2023-07-27 15:24 ` Alexander Aring
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 13/13] fs: dlm: don't use RCOM_NAMES for version detection Alexander Aring
11 siblings, 1 reply; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:23 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch puts the life of a midcomms node the same as a lowcomms
connection. The lowcomms connection lifetime was changed by commit
6f0b0b5d7ae7 ("fs: dlm: remove dlm_node_addrs lookup list"). In the
future the midcomms node instances can be merged with lowcomms
connection structure as the lifetime is the same and states can be
controlled over values or flags.
Before midcomms nodes were generated during version detection. This is
not necessary anymore when the nodes are created when the cluster
manager configures DLM via configfs. When a midcomms node is created over
configfs it well set DLM_VERSION_NOT_SET as version. This indicates that
the version of the midcomms node is still unknown and need to be probed
via certain rcom messages.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/config.c | 2 +-
fs/dlm/midcomms.c | 286 +++++++++++++++++-----------------------------
fs/dlm/midcomms.h | 1 +
3 files changed, 110 insertions(+), 179 deletions(-)
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 2beceff024e3..e55e0a2cd2e8 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf,
memcpy(addr, buf, len);
- rv = dlm_lowcomms_addr(cm->nodeid, addr, len);
+ rv = dlm_midcomms_addr(cm->nodeid, addr, len);
if (rv) {
kfree(addr);
return rv;
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index c125496e03bf..24952ecf8e1a 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
}
-static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
+static struct midcomms_node *nodeid2node(int nodeid)
{
- struct midcomms_node *node, *tmp;
- int r = nodeid_hash(nodeid);
+ return __find_node(nodeid, nodeid_hash(nodeid));
+}
+
+int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
+{
+ int ret, r = nodeid_hash(nodeid);
+ struct midcomms_node *node;
- node = __find_node(nodeid, r);
- if (node || !alloc)
- return node;
+ ret = dlm_lowcomms_addr(nodeid, addr, len);
+ if (ret)
+ return ret;
- node = kmalloc(sizeof(*node), alloc);
+ node = kmalloc(sizeof(*node), GFP_NOFS);
if (!node)
- return NULL;
+ return -ENOMEM;
node->nodeid = nodeid;
spin_lock_init(&node->state_lock);
@@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
midcomms_node_reset(node);
spin_lock(&nodes_lock);
- /* check again if there was somebody else
- * earlier here to add the node
- */
- tmp = __find_node(nodeid, r);
- if (tmp) {
- spin_unlock(&nodes_lock);
- kfree(node);
- return tmp;
- }
-
hlist_add_head_rcu(&node->hlist, &node_hash[r]);
spin_unlock(&nodes_lock);
node->debugfs = dlm_create_debug_comms_file(nodeid, node);
- return node;
+ return 0;
}
static int dlm_send_ack(int nodeid, uint32_t seq)
@@ -603,112 +598,6 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
}
}
-static struct midcomms_node *
-dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
- uint16_t msglen, int (*cb)(struct midcomms_node *node))
-{
- struct midcomms_node *node = NULL;
- gfp_t allocation = 0;
- int ret;
-
- switch (p->header.h_cmd) {
- case DLM_RCOM:
- if (msglen < sizeof(struct dlm_rcom)) {
- log_print("rcom msg too small: %u, will skip this message from node %d",
- msglen, nodeid);
- return NULL;
- }
-
- switch (p->rcom.rc_type) {
- case cpu_to_le32(DLM_RCOM_NAMES):
- fallthrough;
- case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
- fallthrough;
- case cpu_to_le32(DLM_RCOM_STATUS):
- fallthrough;
- case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
- node = nodeid2node(nodeid, 0);
- if (node) {
- spin_lock(&node->state_lock);
- if (node->state != DLM_ESTABLISHED)
- pr_debug("receive begin RCOM msg from node %d with state %s\n",
- node->nodeid, dlm_state_str(node->state));
-
- switch (node->state) {
- case DLM_CLOSED:
- node->state = DLM_ESTABLISHED;
- pr_debug("switch node %d to state %s\n",
- node->nodeid, dlm_state_str(node->state));
- break;
- case DLM_ESTABLISHED:
- break;
- default:
- spin_unlock(&node->state_lock);
- return NULL;
- }
- spin_unlock(&node->state_lock);
- }
-
- allocation = GFP_NOFS;
- break;
- default:
- break;
- }
-
- break;
- default:
- break;
- }
-
- node = nodeid2node(nodeid, allocation);
- if (!node) {
- switch (p->header.h_cmd) {
- case DLM_OPTS:
- if (msglen < sizeof(struct dlm_opts)) {
- log_print("opts msg too small: %u, will skip this message from node %d",
- msglen, nodeid);
- return NULL;
- }
-
- log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
- p->opts.o_nextcmd, nodeid);
- break;
- default:
- log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
- p->header.h_cmd, nodeid);
- break;
- }
-
- return NULL;
- }
-
- ret = cb(node);
- if (ret < 0)
- return NULL;
-
- return node;
-}
-
-static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
-{
- switch (node->version) {
- case DLM_VERSION_NOT_SET:
- node->version = DLM_VERSION_3_2;
- wake_up(&node->shutdown_wait);
- log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
- node->nodeid);
- break;
- case DLM_VERSION_3_2:
- break;
- default:
- log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
- DLM_VERSION_3_2, node->nodeid, node->version);
- return -1;
- }
-
- return 0;
-}
-
static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen,
int nodeid)
{
@@ -767,10 +656,37 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei
int ret, idx;
idx = srcu_read_lock(&nodes_srcu);
- node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
- dlm_midcomms_version_check_3_2);
- if (!node)
+ node = nodeid2node(nodeid);
+ if (WARN_ON_ONCE(!node))
+ goto out;
+
+ switch (node->version) {
+ case DLM_VERSION_NOT_SET:
+ node->version = DLM_VERSION_3_2;
+ wake_up(&node->shutdown_wait);
+ log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
+ node->nodeid);
+
+ spin_lock(&node->state_lock);
+ switch (node->state) {
+ case DLM_CLOSED:
+ node->state = DLM_ESTABLISHED;
+ pr_debug("switch node %d to state %s\n",
+ node->nodeid, dlm_state_str(node->state));
+ break;
+ default:
+ break;
+ }
+ spin_unlock(&node->state_lock);
+
+ break;
+ case DLM_VERSION_3_2:
+ break;
+ default:
+ log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
+ DLM_VERSION_3_2, node->nodeid, node->version);
goto out;
+ }
switch (p->header.h_cmd) {
case DLM_RCOM:
@@ -860,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei
srcu_read_unlock(&nodes_srcu, idx);
}
-static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
+static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
{
+ uint16_t msglen = le16_to_cpu(p->header.h_length);
+ struct midcomms_node *node;
+ int idx;
+
+ idx = srcu_read_lock(&nodes_srcu);
+ node = nodeid2node(nodeid);
+ if (WARN_ON_ONCE(!node)) {
+ srcu_read_unlock(&nodes_srcu, idx);
+ return;
+ }
+
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_1;
@@ -874,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
default:
log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
DLM_VERSION_3_1, node->nodeid, node->version);
- return -1;
- }
-
- return 0;
-}
-
-static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
-{
- uint16_t msglen = le16_to_cpu(p->header.h_length);
- struct midcomms_node *node;
- int idx;
-
- idx = srcu_read_lock(&nodes_srcu);
- node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
- dlm_midcomms_version_check_3_1);
- if (!node) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
@@ -1005,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid)
int idx, ret;
idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, 0);
- if (!node) {
+ node = nodeid2node(nodeid);
+ if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
@@ -1092,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
int idx;
idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, 0);
- if (!node) {
- WARN_ON_ONCE(1);
+ node = nodeid2node(nodeid);
+ if (WARN_ON_ONCE(!node))
goto err;
- }
/* this is a bug, however we going on and hope it will be resolved */
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
@@ -1237,8 +1146,34 @@ void dlm_midcomms_init(void)
dlm_lowcomms_init();
}
+static void midcomms_node_release(struct rcu_head *rcu)
+{
+ struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
+
+ WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
+ dlm_send_queue_flush(node);
+ kfree(node);
+}
+
void dlm_midcomms_exit(void)
{
+ struct midcomms_node *node;
+ int i, idx;
+
+ idx = srcu_read_lock(&nodes_srcu);
+ for (i = 0; i < CONN_HASH_SIZE; i++) {
+ hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
+ dlm_delete_debug_comms_file(node->debugfs);
+
+ spin_lock(&nodes_lock);
+ hlist_del_rcu(&node->hlist);
+ spin_unlock(&nodes_lock);
+
+ call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
+ }
+ }
+ srcu_read_unlock(&nodes_srcu, idx);
+
dlm_lowcomms_exit();
}
@@ -1279,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid)
int idx;
idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, GFP_NOFS);
- if (!node) {
+ node = nodeid2node(nodeid);
+ if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
@@ -1324,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid)
int idx;
idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, 0);
- if (!node) {
+ node = nodeid2node(nodeid);
+ if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
@@ -1369,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid)
srcu_read_unlock(&nodes_srcu, idx);
}
-static void midcomms_node_release(struct rcu_head *rcu)
-{
- struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
-
- WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
- dlm_send_queue_flush(node);
- kfree(node);
-}
-
void dlm_midcomms_version_wait(void)
{
struct midcomms_node *node;
@@ -1440,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
node->state == DLM_CLOSED ||
test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
DLM_SHUTDOWN_TIMEOUT);
- if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
+ if (!ret)
pr_debug("active shutdown timed out for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
else
@@ -1458,14 +1384,6 @@ void dlm_midcomms_shutdown(void)
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
midcomms_shutdown(node);
-
- dlm_delete_debug_comms_file(node->debugfs);
-
- spin_lock(&nodes_lock);
- hlist_del_rcu(&node->hlist);
- spin_unlock(&nodes_lock);
-
- call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
}
}
srcu_read_unlock(&nodes_srcu, idx);
@@ -1481,7 +1399,7 @@ int dlm_midcomms_close(int nodeid)
idx = srcu_read_lock(&nodes_srcu);
/* Abort pending close/remove operation */
- node = nodeid2node(nodeid, 0);
+ node = nodeid2node(nodeid);
if (node) {
/* let shutdown waiters leave */
set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
@@ -1493,7 +1411,7 @@ int dlm_midcomms_close(int nodeid)
mutex_lock(&close_lock);
idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, 0);
+ node = nodeid2node(nodeid);
if (!node) {
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
@@ -1501,9 +1419,21 @@ int dlm_midcomms_close(int nodeid)
}
ret = dlm_lowcomms_close(nodeid);
- spin_lock(&node->state_lock);
- midcomms_node_reset(node);
- spin_unlock(&node->state_lock);
+ dlm_delete_debug_comms_file(node->debugfs);
+
+ spin_lock(&nodes_lock);
+ hlist_del_rcu(&node->hlist);
+ spin_unlock(&nodes_lock);
+
+ /* wait that all readers left until flush send queue */
+ synchronize_srcu(&nodes_srcu);
+
+ /* drop all pending dlm messages, this is fine as
+ * this function get called when the node is fenced
+ */
+ dlm_send_queue_flush(node);
+
+ call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index 9f8c9605013d..e7246fb3ef57 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen);
+int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
void dlm_midcomms_version_wait(void);
int dlm_midcomms_close(int nodeid);
int dlm_midcomms_start(void);
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread* [Cluster-devel] [PATCH dlm-next 12/13] fs: dlm: create midcomms nodes when configure
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 12/13] fs: dlm: create midcomms nodes when configure Alexander Aring
@ 2023-07-27 15:24 ` Alexander Aring
0 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 15:24 UTC (permalink / raw)
To: cluster-devel.redhat.com
Hi,
On Thu, Jul 27, 2023 at 9:23?AM Alexander Aring <aahringo@redhat.com> wrote:
>
> This patch puts the life of a midcomms node the same as a lowcomms
> connection. The lowcomms connection lifetime was changed by commit
> 6f0b0b5d7ae7 ("fs: dlm: remove dlm_node_addrs lookup list"). In the
> future the midcomms node instances can be merged with lowcomms
> connection structure as the lifetime is the same and states can be
> controlled over values or flags.
>
> Before midcomms nodes were generated during version detection. This is
> not necessary anymore when the nodes are created when the cluster
> manager configures DLM via configfs. When a midcomms node is created over
> configfs it well set DLM_VERSION_NOT_SET as version. This indicates that
> the version of the midcomms node is still unknown and need to be probed
> via certain rcom messages.
>
> Signed-off-by: Alexander Aring <aahringo@redhat.com>
> ---
> fs/dlm/config.c | 2 +-
> fs/dlm/midcomms.c | 286 +++++++++++++++++-----------------------------
> fs/dlm/midcomms.h | 1 +
> 3 files changed, 110 insertions(+), 179 deletions(-)
>
> diff --git a/fs/dlm/config.c b/fs/dlm/config.c
> index 2beceff024e3..e55e0a2cd2e8 100644
> --- a/fs/dlm/config.c
> +++ b/fs/dlm/config.c
> @@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf,
>
> memcpy(addr, buf, len);
>
> - rv = dlm_lowcomms_addr(cm->nodeid, addr, len);
> + rv = dlm_midcomms_addr(cm->nodeid, addr, len);
> if (rv) {
> kfree(addr);
> return rv;
> diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
> index c125496e03bf..24952ecf8e1a 100644
> --- a/fs/dlm/midcomms.c
> +++ b/fs/dlm/midcomms.c
> @@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node *node)
> wake_up(&node->shutdown_wait);
> }
>
> -static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
> +static struct midcomms_node *nodeid2node(int nodeid)
> {
> - struct midcomms_node *node, *tmp;
> - int r = nodeid_hash(nodeid);
> + return __find_node(nodeid, nodeid_hash(nodeid));
> +}
> +
> +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
> +{
> + int ret, r = nodeid_hash(nodeid);
> + struct midcomms_node *node;
>
> - node = __find_node(nodeid, r);
> - if (node || !alloc)
> - return node;
> + ret = dlm_lowcomms_addr(nodeid, addr, len);
> + if (ret)
> + return ret;
>
> - node = kmalloc(sizeof(*node), alloc);
> + node = kmalloc(sizeof(*node), GFP_NOFS);
> if (!node)
> - return NULL;
> + return -ENOMEM;
>
> node->nodeid = nodeid;
> spin_lock_init(&node->state_lock);
> @@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
> midcomms_node_reset(node);
>
> spin_lock(&nodes_lock);
> - /* check again if there was somebody else
> - * earlier here to add the node
> - */
> - tmp = __find_node(nodeid, r);
> - if (tmp) {
> - spin_unlock(&nodes_lock);
> - kfree(node);
> - return tmp;
> - }
> -
> hlist_add_head_rcu(&node->hlist, &node_hash[r]);
> spin_unlock(&nodes_lock);
>
> node->debugfs = dlm_create_debug_comms_file(nodeid, node);
> - return node;
> + return 0;
> }
>
> static int dlm_send_ack(int nodeid, uint32_t seq)
> @@ -603,112 +598,6 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
> }
> }
>
> -static struct midcomms_node *
> -dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
> - uint16_t msglen, int (*cb)(struct midcomms_node *node))
> -{
> - struct midcomms_node *node = NULL;
> - gfp_t allocation = 0;
> - int ret;
> -
> - switch (p->header.h_cmd) {
> - case DLM_RCOM:
> - if (msglen < sizeof(struct dlm_rcom)) {
> - log_print("rcom msg too small: %u, will skip this message from node %d",
> - msglen, nodeid);
> - return NULL;
> - }
> -
> - switch (p->rcom.rc_type) {
> - case cpu_to_le32(DLM_RCOM_NAMES):
> - fallthrough;
> - case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
> - fallthrough;
> - case cpu_to_le32(DLM_RCOM_STATUS):
> - fallthrough;
> - case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
> - node = nodeid2node(nodeid, 0);
> - if (node) {
> - spin_lock(&node->state_lock);
> - if (node->state != DLM_ESTABLISHED)
> - pr_debug("receive begin RCOM msg from node %d with state %s\n",
> - node->nodeid, dlm_state_str(node->state));
> -
> - switch (node->state) {
> - case DLM_CLOSED:
> - node->state = DLM_ESTABLISHED;
> - pr_debug("switch node %d to state %s\n",
> - node->nodeid, dlm_state_str(node->state));
> - break;
> - case DLM_ESTABLISHED:
> - break;
> - default:
> - spin_unlock(&node->state_lock);
> - return NULL;
> - }
> - spin_unlock(&node->state_lock);
> - }
> -
> - allocation = GFP_NOFS;
> - break;
> - default:
> - break;
> - }
> -
> - break;
> - default:
> - break;
> - }
> -
> - node = nodeid2node(nodeid, allocation);
> - if (!node) {
> - switch (p->header.h_cmd) {
> - case DLM_OPTS:
> - if (msglen < sizeof(struct dlm_opts)) {
> - log_print("opts msg too small: %u, will skip this message from node %d",
> - msglen, nodeid);
> - return NULL;
> - }
> -
> - log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
> - p->opts.o_nextcmd, nodeid);
> - break;
> - default:
> - log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
> - p->header.h_cmd, nodeid);
> - break;
> - }
> -
> - return NULL;
> - }
> -
> - ret = cb(node);
> - if (ret < 0)
> - return NULL;
> -
> - return node;
> -}
> -
> -static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
> -{
> - switch (node->version) {
> - case DLM_VERSION_NOT_SET:
> - node->version = DLM_VERSION_3_2;
> - wake_up(&node->shutdown_wait);
> - log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
> - node->nodeid);
> - break;
> - case DLM_VERSION_3_2:
> - break;
> - default:
> - log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
> - DLM_VERSION_3_2, node->nodeid, node->version);
> - return -1;
> - }
> -
> - return 0;
> -}
> -
> static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen,
> int nodeid)
> {
> @@ -767,10 +656,37 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei
> int ret, idx;
>
> idx = srcu_read_lock(&nodes_srcu);
> - node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
> - dlm_midcomms_version_check_3_2);
> - if (!node)
> + node = nodeid2node(nodeid);
> + if (WARN_ON_ONCE(!node))
> + goto out;
> +
> + switch (node->version) {
> + case DLM_VERSION_NOT_SET:
> + node->version = DLM_VERSION_3_2;
> + wake_up(&node->shutdown_wait);
> + log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
> + node->nodeid);
> +
> + spin_lock(&node->state_lock);
> + switch (node->state) {
> + case DLM_CLOSED:
> + node->state = DLM_ESTABLISHED;
> + pr_debug("switch node %d to state %s\n",
> + node->nodeid, dlm_state_str(node->state));
> + break;
> + default:
> + break;
> + }
> + spin_unlock(&node->state_lock);
> +
> + break;
> + case DLM_VERSION_3_2:
> + break;
> + default:
> + log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
> + DLM_VERSION_3_2, node->nodeid, node->version);
> goto out;
> + }
>
> switch (p->header.h_cmd) {
> case DLM_RCOM:
> @@ -860,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei
> srcu_read_unlock(&nodes_srcu, idx);
> }
>
> -static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
> +static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
> {
> + uint16_t msglen = le16_to_cpu(p->header.h_length);
> + struct midcomms_node *node;
> + int idx;
> +
> + idx = srcu_read_lock(&nodes_srcu);
> + node = nodeid2node(nodeid);
> + if (WARN_ON_ONCE(!node)) {
> + srcu_read_unlock(&nodes_srcu, idx);
> + return;
> + }
> +
> switch (node->version) {
> case DLM_VERSION_NOT_SET:
> node->version = DLM_VERSION_3_1;
> @@ -874,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
> default:
> log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
> DLM_VERSION_3_1, node->nodeid, node->version);
> - return -1;
> - }
> -
> - return 0;
> -}
> -
> -static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
> -{
> - uint16_t msglen = le16_to_cpu(p->header.h_length);
> - struct midcomms_node *node;
> - int idx;
> -
> - idx = srcu_read_lock(&nodes_srcu);
> - node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
> - dlm_midcomms_version_check_3_1);
> - if (!node) {
> srcu_read_unlock(&nodes_srcu, idx);
> return;
> }
> @@ -1005,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid)
> int idx, ret;
>
> idx = srcu_read_lock(&nodes_srcu);
> - node = nodeid2node(nodeid, 0);
> - if (!node) {
> + node = nodeid2node(nodeid);
> + if (WARN_ON_ONCE(!node)) {
> srcu_read_unlock(&nodes_srcu, idx);
> return;
> }
> @@ -1092,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
> int idx;
>
> idx = srcu_read_lock(&nodes_srcu);
> - node = nodeid2node(nodeid, 0);
> - if (!node) {
> - WARN_ON_ONCE(1);
> + node = nodeid2node(nodeid);
> + if (WARN_ON_ONCE(!node))
> goto err;
> - }
>
> /* this is a bug, however we going on and hope it will be resolved */
> WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
> @@ -1237,8 +1146,34 @@ void dlm_midcomms_init(void)
> dlm_lowcomms_init();
> }
>
> +static void midcomms_node_release(struct rcu_head *rcu)
> +{
> + struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
> +
> + WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
> + dlm_send_queue_flush(node);
> + kfree(node);
> +}
> +
> void dlm_midcomms_exit(void)
> {
> + struct midcomms_node *node;
> + int i, idx;
> +
> + idx = srcu_read_lock(&nodes_srcu);
> + for (i = 0; i < CONN_HASH_SIZE; i++) {
> + hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
> + dlm_delete_debug_comms_file(node->debugfs);
> +
> + spin_lock(&nodes_lock);
> + hlist_del_rcu(&node->hlist);
> + spin_unlock(&nodes_lock);
> +
> + call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
> + }
> + }
> + srcu_read_unlock(&nodes_srcu, idx);
> +
> dlm_lowcomms_exit();
> }
>
> @@ -1279,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid)
> int idx;
>
> idx = srcu_read_lock(&nodes_srcu);
> - node = nodeid2node(nodeid, GFP_NOFS);
> - if (!node) {
> + node = nodeid2node(nodeid);
> + if (WARN_ON_ONCE(!node)) {
> srcu_read_unlock(&nodes_srcu, idx);
> return;
> }
> @@ -1324,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid)
> int idx;
>
> idx = srcu_read_lock(&nodes_srcu);
> - node = nodeid2node(nodeid, 0);
> - if (!node) {
> + node = nodeid2node(nodeid);
> + if (WARN_ON_ONCE(!node)) {
> srcu_read_unlock(&nodes_srcu, idx);
> return;
> }
> @@ -1369,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid)
> srcu_read_unlock(&nodes_srcu, idx);
> }
>
> -static void midcomms_node_release(struct rcu_head *rcu)
> -{
> - struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
> -
> - WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
> - dlm_send_queue_flush(node);
> - kfree(node);
> -}
> -
> void dlm_midcomms_version_wait(void)
> {
> struct midcomms_node *node;
> @@ -1440,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
> node->state == DLM_CLOSED ||
> test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
> DLM_SHUTDOWN_TIMEOUT);
> - if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
> + if (!ret)
> pr_debug("active shutdown timed out for node %d with state %s\n",
> node->nodeid, dlm_state_str(node->state));
> else
> @@ -1458,14 +1384,6 @@ void dlm_midcomms_shutdown(void)
> for (i = 0; i < CONN_HASH_SIZE; i++) {
> hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
> midcomms_shutdown(node);
> -
> - dlm_delete_debug_comms_file(node->debugfs);
> -
> - spin_lock(&nodes_lock);
> - hlist_del_rcu(&node->hlist);
> - spin_unlock(&nodes_lock);
> -
> - call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
> }
> }
> srcu_read_unlock(&nodes_srcu, idx);
> @@ -1481,7 +1399,7 @@ int dlm_midcomms_close(int nodeid)
>
> idx = srcu_read_lock(&nodes_srcu);
> /* Abort pending close/remove operation */
> - node = nodeid2node(nodeid, 0);
> + node = nodeid2node(nodeid);
> if (node) {
> /* let shutdown waiters leave */
> set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
> @@ -1493,7 +1411,7 @@ int dlm_midcomms_close(int nodeid)
>
> mutex_lock(&close_lock);
> idx = srcu_read_lock(&nodes_srcu);
> - node = nodeid2node(nodeid, 0);
> + node = nodeid2node(nodeid);
> if (!node) {
> srcu_read_unlock(&nodes_srcu, idx);
> mutex_unlock(&close_lock);
> @@ -1501,9 +1419,21 @@ int dlm_midcomms_close(int nodeid)
> }
>
> ret = dlm_lowcomms_close(nodeid);
> - spin_lock(&node->state_lock);
> - midcomms_node_reset(node);
> - spin_unlock(&node->state_lock);
> + dlm_delete_debug_comms_file(node->debugfs);
> +
> + spin_lock(&nodes_lock);
> + hlist_del_rcu(&node->hlist);
> + spin_unlock(&nodes_lock);
> +
> + /* wait that all readers left until flush send queue */
> + synchronize_srcu(&nodes_srcu);
> +
This will obviously deadlock when holding the nodes_srcu read lock. I
have a small fix for it by moving srcu_read_unlock(&nodes_srcu, idx);
right after we removed the node from the hash.
- Alex
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Cluster-devel] [PATCH dlm-next 13/13] fs: dlm: don't use RCOM_NAMES for version detection
2023-07-27 13:22 [Cluster-devel] [PATCH dlm-next 01/13] fs: dlm: add missing spin_unlock Alexander Aring
` (10 preceding siblings ...)
2023-07-27 13:23 ` [Cluster-devel] [PATCH dlm-next 12/13] fs: dlm: create midcomms nodes when configure Alexander Aring
@ 2023-07-27 13:23 ` Alexander Aring
11 siblings, 0 replies; 14+ messages in thread
From: Alexander Aring @ 2023-07-27 13:23 UTC (permalink / raw)
To: cluster-devel.redhat.com
Currently RCOM_STATUS and RCOM_NAMES inclusive their replies are being
used to determine the DLM version. The RCOM_NAMES messages are triggered
in DLM recovery when calling dlm_recover_directory() only. At this time
the DLM version need to be determined. I ran some tests and did not
expirenced some issues. When the DLM version detection was developed
probably I run once in a case of RCOM_NAMES and the version was not
detected yet. However it seems to be not necessary.
For backwards compatibility we still need to accept RCOM_NAMES messages
which are not protected regarding the DLM message reliability layer aka
stateless message. This patch changes that RCOM_NAMES we are sending out
after this patch are not stateless anymore.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/rcom.c | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 6ab029149a1d..3b734aed26b5 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -308,15 +308,15 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
int last_len, uint64_t seq)
{
+ struct dlm_mhandle *mh;
struct dlm_rcom *rc;
- struct dlm_msg *msg;
int error = 0;
ls->ls_recover_nodeid = nodeid;
retry:
- error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
- &rc, &msg, seq);
+ error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len,
+ &rc, &mh, seq);
if (error)
goto out;
memcpy(rc->rc_buf, last_name, last_len);
@@ -324,7 +324,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
- send_rcom_stateless(msg, rc);
+ send_rcom(mh, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
@@ -337,17 +337,17 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
uint64_t seq)
{
+ struct dlm_mhandle *mh;
struct dlm_rcom *rc;
int error, inlen, outlen, nodeid;
- struct dlm_msg *msg;
nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
inlen = le16_to_cpu(rc_in->rc_header.h_length) -
sizeof(struct dlm_rcom);
outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
- error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
- &rc, &msg, seq);
+ error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
+ &rc, &mh, seq);
if (error)
return;
rc->rc_id = rc_in->rc_id;
@@ -355,7 +355,7 @@ static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid);
- send_rcom_stateless(msg, rc);
+ send_rcom(mh, rc);
}
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
--
2.31.1
^ permalink raw reply related [flat|nested] 14+ messages in thread