* [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning
@ 2022-11-17 22:11 Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 02/18] fs: dlm: drop lkb ref in bug case Alexander Aring
` (16 more replies)
0 siblings, 17 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch avoid the false-positive checker warning about writing 112
bytes into a 88 bytes field "e->request", see:
[ 54.891560] dlm: csmb1: dlm_recover_directory 23 out 2 messages
[ 54.990542] ------------[ cut here ]------------
[ 54.991012] memcpy: detected field-spanning write (size 112) of single field "&e->request" at fs/dlm/requestqueue.c:47 (size 88)
[ 54.992150] WARNING: CPU: 0 PID: 297 at fs/dlm/requestqueue.c:47 dlm_add_requestqueue+0x177/0x180
[ 54.993002] CPU: 0 PID: 297 Comm: kworker/u4:3 Not tainted 6.1.0-rc5-00008-ge01d50cbd6ee #248
[ 54.993878] Hardware name: QEMU Standard PC (Q35 + ICH9, 2009), BIOS 1.16.0-1.fc36 04/01/2014
[ 54.994718] Workqueue: dlm_recv process_recv_sockets
[ 54.995230] RIP: 0010:dlm_add_requestqueue+0x177/0x180
[ 54.995731] Code: e7 01 0f 85 3b ff ff ff b9 58 00 00 00 48 c7 c2 c0 41 74 82 4c 89 ee 48 c7 c7 20 42 74 82 c6 05 8b 8d 30 02 01 e8 51 07 be 00 <0f> 0b e9 12 ff ff ff 66 90 0f 1f 44 00 00 41 57 48 8d 87 10 08 00
[ 54.997483] RSP: 0018:ffffc90000b1fbe8 EFLAGS: 00010282
[ 54.997990] RAX: 0000000000000000 RBX: ffff888024fc3d00 RCX: 0000000000000000
[ 54.998667] RDX: 0000000000000001 RSI: ffffffff81155014 RDI: fffff52000163f73
[ 54.999342] RBP: ffff88800dbac000 R08: 0000000000000001 R09: ffffc90000b1fa5f
[ 54.999997] R10: fffff52000163f4b R11: 203a7970636d656d R12: ffff88800cfb0018
[ 55.000673] R13: 0000000000000070 R14: ffff888024fc3d18 R15: 0000000000000000
[ 55.001344] FS: 0000000000000000(0000) GS:ffff88806d600000(0000) knlGS:0000000000000000
[ 55.002078] CS: 0010 DS: 0000 ES: 0000 CR0: 0000000080050033
[ 55.002603] CR2: 00007f35d4f0b9a0 CR3: 0000000025495002 CR4: 0000000000770ef0
[ 55.003258] PKRU: 55555554
[ 55.003514] Call Trace:
[ 55.003756] <TASK>
[ 55.003953] dlm_receive_buffer+0x1c0/0x200
[ 55.004348] dlm_process_incoming_buffer+0x46d/0x780
[ 55.004786] ? kernel_recvmsg+0x8b/0xc0
[ 55.005150] receive_from_sock.isra.0+0x168/0x420
[ 55.005582] ? process_listen_recv_socket+0x10/0x10
[ 55.006018] ? finish_task_switch.isra.0+0xe0/0x400
[ 55.006469] ? __switch_to+0x2fe/0x6a0
[ 55.006808] ? read_word_at_a_time+0xe/0x20
[ 55.007197] ? strscpy+0x146/0x190
[ 55.007505] process_one_work+0x3d0/0x6b0
[ 55.007863] worker_thread+0x8d/0x620
[ 55.008209] ? __kthread_parkme+0xd8/0xf0
[ 55.008565] ? process_one_work+0x6b0/0x6b0
[ 55.008937] kthread+0x171/0x1a0
[ 55.009251] ? kthread_exit+0x60/0x60
[ 55.009582] ret_from_fork+0x1f/0x30
[ 55.009903] </TASK>
[ 55.010120] ---[ end trace 0000000000000000 ]---
[ 55.025783] dlm: csmb1: dlm_recover 5 generation 3 done: 201 ms
[ 55.026466] gfs2: fsid=smbcluster:csmb1.0: recover generation 3 done
It seems the checker is unable to detect the additional length bytes
which was allocated additionally for the flexible array in struct
dlm_message. To solve it we split the memcpy() into copy for the 88 bytes
struct and another memcpy() for the flexible array m_extra field.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/requestqueue.c | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 036a9a0078f6..8be2893ad15b 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -44,7 +44,8 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
e->nodeid = nodeid;
- memcpy(&e->request, ms, le16_to_cpu(ms->m_header.h_length));
+ memcpy(&e->request, ms, sizeof(*ms));
+ memcpy(&e->request.m_extra, ms->m_extra, length);
atomic_inc(&ls->ls_requestqueue_cnt);
mutex_lock(&ls->ls_requestqueue_mutex);
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 02/18] fs: dlm: drop lkb ref in bug case
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 03/18] fs: dlm: ast do WARN_ON_ONCE() on hotpath Alexander Aring
` (15 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch will drop the lkb reference in an very unlikely case which
should in practice not happened. However if it happens we cleanup the
reference just in case.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/ast.c | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 078bbbd43a53..982d093b80cd 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -186,7 +186,7 @@ void dlm_callback_work(struct work_struct *work)
spin_unlock(&lkb->lkb_cb_lock);
if (WARN_ON(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
- return;
+ goto out;
for (;;) {
castfn = lkb->lkb_astfn;
@@ -217,6 +217,7 @@ void dlm_callback_work(struct work_struct *work)
spin_unlock(&lkb->lkb_cb_lock);
}
+out:
/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
dlm_put_lkb(lkb);
}
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 03/18] fs: dlm: ast do WARN_ON_ONCE() on hotpath
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 02/18] fs: dlm: drop lkb ref in bug case Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 04/18] fs: dlm: rename DLM_IFL_NEED_SCHED to DLM_IFL_CB_PENDING Alexander Aring
` (14 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch changes the ast hotpath functionality in very unlikely cases
that we do WARN_ON_ONCE() instead of WARN_ON() to not spamming the
console output if we run into states that it would occur over and over
again.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/ast.c | 6 +++---
fs/dlm/user.c | 8 ++++----
2 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 982d093b80cd..7dd072b8ea38 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -161,12 +161,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
spin_unlock(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_FAILURE:
- WARN_ON(1);
+ WARN_ON_ONCE(1);
break;
case DLM_ENQUEUE_CALLBACK_SUCCESS:
break;
default:
- WARN_ON(1);
+ WARN_ON_ONCE(1);
break;
}
spin_unlock(&lkb->lkb_cb_lock);
@@ -185,7 +185,7 @@ void dlm_callback_work(struct work_struct *work)
rv = dlm_dequeue_lkb_callback(lkb, &cb);
spin_unlock(&lkb->lkb_cb_lock);
- if (WARN_ON(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
+ if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
goto out;
for (;;) {
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 6b530db4bc0b..edf722d6935a 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -214,7 +214,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
switch (rv) {
case DLM_ENQUEUE_CALLBACK_FAILURE:
spin_unlock(&proc->asts_spin);
- WARN_ON(1);
+ WARN_ON_ONCE(1);
goto out;
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref);
@@ -224,7 +224,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
case DLM_ENQUEUE_CALLBACK_SUCCESS:
break;
default:
- WARN_ON(1);
+ WARN_ON_ONCE(1);
break;
}
spin_unlock(&proc->asts_spin);
@@ -880,7 +880,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
spin_unlock(&proc->asts_spin);
/* removes ref for proc->asts, may cause lkb to be freed */
dlm_put_lkb(lkb);
- WARN_ON(1);
+ WARN_ON_ONCE(1);
goto try_another;
case DLM_DEQUEUE_CALLBACK_LAST:
list_del_init(&lkb->lkb_cb_list);
@@ -890,7 +890,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
case DLM_DEQUEUE_CALLBACK_SUCCESS:
break;
default:
- WARN_ON(1);
+ WARN_ON_ONCE(1);
break;
}
spin_unlock(&proc->asts_spin);
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 04/18] fs: dlm: rename DLM_IFL_NEED_SCHED to DLM_IFL_CB_PENDING
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 02/18] fs: dlm: drop lkb ref in bug case Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 03/18] fs: dlm: ast do WARN_ON_ONCE() on hotpath Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 05/18] fs: dlm: rename seq to h_seq for msg tracing Alexander Aring
` (13 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch renames DLM_IFL_NEED_SCHED to DLM_IFL_CB_PENDING because
CB_PENDING is a proper name to describe this flag. This flag is set when
callback enqueue will return DLM_ENQUEUE_CALLBACK_NEED_SCHED because the
callback worker need to be queued. The flag tells that callbacks are
currently pending to be called and will be unset if the callback work
for the specific lkb is done. The term need schedule is part of this
time but a proper name is to say that there are some callbacks pending
to being called.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/ast.c | 9 ++++-----
fs/dlm/dlm_internal.h | 2 +-
fs/dlm/user.c | 3 +--
3 files changed, 6 insertions(+), 8 deletions(-)
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 7dd072b8ea38..26fef9945cc9 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -45,8 +45,7 @@ void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
kref_put(&cb->ref, dlm_release_callback);
}
- /* TODO */
- lkb->lkb_flags &= ~DLM_IFL_NEED_SCHED;
+ lkb->lkb_flags &= ~DLM_IFL_CB_PENDING;
/* invalidate */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
@@ -104,8 +103,8 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
cb->sb_status = status;
cb->sb_flags = (sbflags & 0x000000FF);
kref_init(&cb->ref);
- if (!(lkb->lkb_flags & DLM_IFL_NEED_SCHED)) {
- lkb->lkb_flags |= DLM_IFL_NEED_SCHED;
+ if (!(lkb->lkb_flags & DLM_IFL_CB_PENDING)) {
+ lkb->lkb_flags |= DLM_IFL_CB_PENDING;
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
}
list_add_tail(&cb->list, &lkb->lkb_callbacks);
@@ -210,7 +209,7 @@ void dlm_callback_work(struct work_struct *work)
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
- lkb->lkb_flags &= ~DLM_IFL_NEED_SCHED;
+ lkb->lkb_flags &= ~DLM_IFL_CB_PENDING;
spin_unlock(&lkb->lkb_cb_lock);
break;
}
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 6318e0f51bc9..ab1a55337a6e 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -211,7 +211,7 @@ struct dlm_args {
#endif
#define DLM_IFL_DEADLOCK_CANCEL 0x01000000
#define DLM_IFL_STUB_MS 0x02000000 /* magic number for m_flags */
-#define DLM_IFL_NEED_SCHED 0x04000000
+#define DLM_IFL_CB_PENDING 0x04000000
/* least significant 2 bytes are message changed, they are full transmitted
* but at receive side only the 2 bytes LSB will be set.
*
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index edf722d6935a..35129505ddda 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -884,8 +884,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
goto try_another;
case DLM_DEQUEUE_CALLBACK_LAST:
list_del_init(&lkb->lkb_cb_list);
- /* TODO */
- lkb->lkb_flags &= ~DLM_IFL_NEED_SCHED;
+ lkb->lkb_flags &= ~DLM_IFL_CB_PENDING;
break;
case DLM_DEQUEUE_CALLBACK_SUCCESS:
break;
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 05/18] fs: dlm: rename seq to h_seq for msg tracing
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (2 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 04/18] fs: dlm: rename DLM_IFL_NEED_SCHED to DLM_IFL_CB_PENDING Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 06/18] fs: dlm: add dst nodeid " Alexander Aring
` (12 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch renames seq to h_seq as it is named in the dlm header
structure.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
include/trace/events/dlm.h | 44 +++++++++++++++++++-------------------
1 file changed, 22 insertions(+), 22 deletions(-)
diff --git a/include/trace/events/dlm.h b/include/trace/events/dlm.h
index 4ec47828d55e..212f30aec7cf 100644
--- a/include/trace/events/dlm.h
+++ b/include/trace/events/dlm.h
@@ -342,12 +342,12 @@ TRACE_EVENT(dlm_unlock_end,
DECLARE_EVENT_CLASS(dlm_rcom_template,
- TP_PROTO(uint32_t seq, const struct dlm_rcom *rc),
+ TP_PROTO(uint32_t h_seq, const struct dlm_rcom *rc),
- TP_ARGS(seq, rc),
+ TP_ARGS(h_seq, rc),
TP_STRUCT__entry(
- __field(uint32_t, seq)
+ __field(uint32_t, h_seq)
__field(uint32_t, h_version)
__field(uint32_t, h_lockspace)
__field(uint32_t, h_nodeid)
@@ -363,7 +363,7 @@ DECLARE_EVENT_CLASS(dlm_rcom_template,
),
TP_fast_assign(
- __entry->seq = seq;
+ __entry->h_seq = h_seq;
__entry->h_version = le32_to_cpu(rc->rc_header.h_version);
__entry->h_lockspace = le32_to_cpu(rc->rc_header.u.h_lockspace);
__entry->h_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
@@ -378,10 +378,10 @@ DECLARE_EVENT_CLASS(dlm_rcom_template,
__get_dynamic_array_len(rc_buf));
),
- TP_printk("seq=%u, h_version=%s h_lockspace=%u h_nodeid=%u "
+ TP_printk("h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
"h_length=%u h_cmd=%s rc_type=%s rc_result=%d "
"rc_id=%llu rc_seq=%llu rc_seq_reply=%llu "
- "rc_buf=0x%s", __entry->seq,
+ "rc_buf=0x%s", __entry->h_seq,
show_message_version(__entry->h_version),
__entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
show_header_cmd(__entry->h_cmd),
@@ -394,22 +394,22 @@ DECLARE_EVENT_CLASS(dlm_rcom_template,
);
DEFINE_EVENT(dlm_rcom_template, dlm_send_rcom,
- TP_PROTO(uint32_t seq, const struct dlm_rcom *rc),
- TP_ARGS(seq, rc));
+ TP_PROTO(uint32_t h_seq, const struct dlm_rcom *rc),
+ TP_ARGS(h_seq, rc));
DEFINE_EVENT(dlm_rcom_template, dlm_recv_rcom,
- TP_PROTO(uint32_t seq, const struct dlm_rcom *rc),
- TP_ARGS(seq, rc));
+ TP_PROTO(uint32_t h_seq, const struct dlm_rcom *rc),
+ TP_ARGS(h_seq, rc));
TRACE_EVENT(dlm_send_message,
- TP_PROTO(uint32_t seq, const struct dlm_message *ms,
+ TP_PROTO(uint32_t h_seq, const struct dlm_message *ms,
const void *name, int namelen),
- TP_ARGS(seq, ms, name, namelen),
+ TP_ARGS(h_seq, ms, name, namelen),
TP_STRUCT__entry(
- __field(uint32_t, seq)
+ __field(uint32_t, h_seq)
__field(uint32_t, h_version)
__field(uint32_t, h_lockspace)
__field(uint32_t, h_nodeid)
@@ -439,7 +439,7 @@ TRACE_EVENT(dlm_send_message,
),
TP_fast_assign(
- __entry->seq = seq;
+ __entry->h_seq = h_seq;
__entry->h_version = le32_to_cpu(ms->m_header.h_version);
__entry->h_lockspace = le32_to_cpu(ms->m_header.u.h_lockspace);
__entry->h_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
@@ -469,14 +469,14 @@ TRACE_EVENT(dlm_send_message,
__get_dynamic_array_len(res_name));
),
- TP_printk("seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
+ TP_printk("h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
"h_length=%u h_cmd=%s m_type=%s m_nodeid=%u "
"m_pid=%u m_lkid=%u m_remid=%u m_parent_lkid=%u "
"m_parent_remid=%u m_exflags=%s m_sbflags=%s m_flags=%s "
"m_lvbseq=%u m_hash=%u m_status=%d m_grmode=%s "
"m_rqmode=%s m_bastmode=%s m_asts=%d m_result=%d "
"m_extra=0x%s res_name=0x%s",
- __entry->seq, show_message_version(__entry->h_version),
+ __entry->h_seq, show_message_version(__entry->h_version),
__entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
show_header_cmd(__entry->h_cmd),
show_message_type(__entry->m_type),
@@ -499,12 +499,12 @@ TRACE_EVENT(dlm_send_message,
TRACE_EVENT(dlm_recv_message,
- TP_PROTO(uint32_t seq, const struct dlm_message *ms),
+ TP_PROTO(uint32_t h_seq, const struct dlm_message *ms),
- TP_ARGS(seq, ms),
+ TP_ARGS(h_seq, ms),
TP_STRUCT__entry(
- __field(uint32_t, seq)
+ __field(uint32_t, h_seq)
__field(uint32_t, h_version)
__field(uint32_t, h_lockspace)
__field(uint32_t, h_nodeid)
@@ -533,7 +533,7 @@ TRACE_EVENT(dlm_recv_message,
),
TP_fast_assign(
- __entry->seq = seq;
+ __entry->h_seq = h_seq;
__entry->h_version = le32_to_cpu(ms->m_header.h_version);
__entry->h_lockspace = le32_to_cpu(ms->m_header.u.h_lockspace);
__entry->h_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
@@ -561,14 +561,14 @@ TRACE_EVENT(dlm_recv_message,
__get_dynamic_array_len(m_extra));
),
- TP_printk("seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
+ TP_printk("h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
"h_length=%u h_cmd=%s m_type=%s m_nodeid=%u "
"m_pid=%u m_lkid=%u m_remid=%u m_parent_lkid=%u "
"m_parent_remid=%u m_exflags=%s m_sbflags=%s m_flags=%s "
"m_lvbseq=%u m_hash=%u m_status=%d m_grmode=%s "
"m_rqmode=%s m_bastmode=%s m_asts=%d m_result=%d "
"m_extra=0x%s",
- __entry->seq, show_message_version(__entry->h_version),
+ __entry->h_seq, show_message_version(__entry->h_version),
__entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
show_header_cmd(__entry->h_cmd),
show_message_type(__entry->m_type),
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 06/18] fs: dlm: add dst nodeid for msg tracing
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (3 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 05/18] fs: dlm: rename seq to h_seq for msg tracing Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 07/18] fs: dlm: add midcomms init/start functions Alexander Aring
` (11 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
In DLM when we send a dlm message it is easy to add the lock resource
name, but additional lookup is required when to trace the receive
message side. The idea here is to move the lookup work to the user by
using a lookup to find the right send message with recv message. As note
DLM can't drop any message which is guaranteed by a special session
layer.
For doing the lookup a 3 tupel is required as an unique identification
which is dst nodeid, src nodeid and sequence number. This patch adds the
destination nodeid to the dlm message trace points. The source nodeid is
given by the h_nodeid field inside the header.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/midcomms.c | 10 ++++++----
include/trace/events/dlm.h | 38 ++++++++++++++++++++++----------------
2 files changed, 28 insertions(+), 20 deletions(-)
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 32194a750fe1..960def5ab530 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -479,10 +479,10 @@ static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p)
{
switch (p->header.h_cmd) {
case DLM_MSG:
- trace_dlm_recv_message(seq, &p->message);
+ trace_dlm_recv_message(dlm_our_nodeid(), seq, &p->message);
break;
case DLM_RCOM:
- trace_dlm_recv_rcom(seq, &p->rcom);
+ trace_dlm_recv_rcom(dlm_our_nodeid(), seq, &p->rcom);
break;
default:
break;
@@ -1151,11 +1151,13 @@ static void dlm_midcomms_commit_msg_3_2_trace(const struct dlm_mhandle *mh,
{
switch (mh->inner_p->header.h_cmd) {
case DLM_MSG:
- trace_dlm_send_message(mh->seq, &mh->inner_p->message,
+ trace_dlm_send_message(mh->node->nodeid, mh->seq,
+ &mh->inner_p->message,
name, namelen);
break;
case DLM_RCOM:
- trace_dlm_send_rcom(mh->seq, &mh->inner_p->rcom);
+ trace_dlm_send_rcom(mh->node->nodeid, mh->seq,
+ &mh->inner_p->rcom);
break;
default:
/* nothing to trace */
diff --git a/include/trace/events/dlm.h b/include/trace/events/dlm.h
index 212f30aec7cf..37eb79e29b28 100644
--- a/include/trace/events/dlm.h
+++ b/include/trace/events/dlm.h
@@ -342,11 +342,12 @@ TRACE_EVENT(dlm_unlock_end,
DECLARE_EVENT_CLASS(dlm_rcom_template,
- TP_PROTO(uint32_t h_seq, const struct dlm_rcom *rc),
+ TP_PROTO(uint32_t dst, uint32_t h_seq, const struct dlm_rcom *rc),
- TP_ARGS(h_seq, rc),
+ TP_ARGS(dst, h_seq, rc),
TP_STRUCT__entry(
+ __field(uint32_t, dst)
__field(uint32_t, h_seq)
__field(uint32_t, h_version)
__field(uint32_t, h_lockspace)
@@ -363,6 +364,7 @@ DECLARE_EVENT_CLASS(dlm_rcom_template,
),
TP_fast_assign(
+ __entry->dst = dst;
__entry->h_seq = h_seq;
__entry->h_version = le32_to_cpu(rc->rc_header.h_version);
__entry->h_lockspace = le32_to_cpu(rc->rc_header.u.h_lockspace);
@@ -378,10 +380,10 @@ DECLARE_EVENT_CLASS(dlm_rcom_template,
__get_dynamic_array_len(rc_buf));
),
- TP_printk("h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
+ TP_printk("dst=%u h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
"h_length=%u h_cmd=%s rc_type=%s rc_result=%d "
"rc_id=%llu rc_seq=%llu rc_seq_reply=%llu "
- "rc_buf=0x%s", __entry->h_seq,
+ "rc_buf=0x%s", __entry->dst, __entry->h_seq,
show_message_version(__entry->h_version),
__entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
show_header_cmd(__entry->h_cmd),
@@ -394,21 +396,22 @@ DECLARE_EVENT_CLASS(dlm_rcom_template,
);
DEFINE_EVENT(dlm_rcom_template, dlm_send_rcom,
- TP_PROTO(uint32_t h_seq, const struct dlm_rcom *rc),
- TP_ARGS(h_seq, rc));
+ TP_PROTO(uint32_t dst, uint32_t h_seq, const struct dlm_rcom *rc),
+ TP_ARGS(dst, h_seq, rc));
DEFINE_EVENT(dlm_rcom_template, dlm_recv_rcom,
- TP_PROTO(uint32_t h_seq, const struct dlm_rcom *rc),
- TP_ARGS(h_seq, rc));
+ TP_PROTO(uint32_t dst, uint32_t h_seq, const struct dlm_rcom *rc),
+ TP_ARGS(dst, h_seq, rc));
TRACE_EVENT(dlm_send_message,
- TP_PROTO(uint32_t h_seq, const struct dlm_message *ms,
+ TP_PROTO(uint32_t dst, uint32_t h_seq, const struct dlm_message *ms,
const void *name, int namelen),
- TP_ARGS(h_seq, ms, name, namelen),
+ TP_ARGS(dst, h_seq, ms, name, namelen),
TP_STRUCT__entry(
+ __field(uint32_t, dst)
__field(uint32_t, h_seq)
__field(uint32_t, h_version)
__field(uint32_t, h_lockspace)
@@ -439,6 +442,7 @@ TRACE_EVENT(dlm_send_message,
),
TP_fast_assign(
+ __entry->dst = dst;
__entry->h_seq = h_seq;
__entry->h_version = le32_to_cpu(ms->m_header.h_version);
__entry->h_lockspace = le32_to_cpu(ms->m_header.u.h_lockspace);
@@ -469,13 +473,13 @@ TRACE_EVENT(dlm_send_message,
__get_dynamic_array_len(res_name));
),
- TP_printk("h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
+ TP_printk("dst=%u h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
"h_length=%u h_cmd=%s m_type=%s m_nodeid=%u "
"m_pid=%u m_lkid=%u m_remid=%u m_parent_lkid=%u "
"m_parent_remid=%u m_exflags=%s m_sbflags=%s m_flags=%s "
"m_lvbseq=%u m_hash=%u m_status=%d m_grmode=%s "
"m_rqmode=%s m_bastmode=%s m_asts=%d m_result=%d "
- "m_extra=0x%s res_name=0x%s",
+ "m_extra=0x%s res_name=0x%s", __entry->dst,
__entry->h_seq, show_message_version(__entry->h_version),
__entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
show_header_cmd(__entry->h_cmd),
@@ -499,11 +503,12 @@ TRACE_EVENT(dlm_send_message,
TRACE_EVENT(dlm_recv_message,
- TP_PROTO(uint32_t h_seq, const struct dlm_message *ms),
+ TP_PROTO(uint32_t dst, uint32_t h_seq, const struct dlm_message *ms),
- TP_ARGS(h_seq, ms),
+ TP_ARGS(dst, h_seq, ms),
TP_STRUCT__entry(
+ __field(uint32_t, dst)
__field(uint32_t, h_seq)
__field(uint32_t, h_version)
__field(uint32_t, h_lockspace)
@@ -533,6 +538,7 @@ TRACE_EVENT(dlm_recv_message,
),
TP_fast_assign(
+ __entry->dst = dst;
__entry->h_seq = h_seq;
__entry->h_version = le32_to_cpu(ms->m_header.h_version);
__entry->h_lockspace = le32_to_cpu(ms->m_header.u.h_lockspace);
@@ -561,13 +567,13 @@ TRACE_EVENT(dlm_recv_message,
__get_dynamic_array_len(m_extra));
),
- TP_printk("h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
+ TP_printk("dst=%u h_seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
"h_length=%u h_cmd=%s m_type=%s m_nodeid=%u "
"m_pid=%u m_lkid=%u m_remid=%u m_parent_lkid=%u "
"m_parent_remid=%u m_exflags=%s m_sbflags=%s m_flags=%s "
"m_lvbseq=%u m_hash=%u m_status=%d m_grmode=%s "
"m_rqmode=%s m_bastmode=%s m_asts=%d m_result=%d "
- "m_extra=0x%s",
+ "m_extra=0x%s", __entry->dst,
__entry->h_seq, show_message_version(__entry->h_version),
__entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
show_header_cmd(__entry->h_cmd),
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 07/18] fs: dlm: add midcomms init/start functions
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (4 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 06/18] fs: dlm: add dst nodeid " Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 08/18] fs: dlm: remove twice INIT_WORK Alexander Aring
` (10 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch introduces leftovers of init, start, stop and exit
functionality. The dlm application layer should always call the midcomms
layer which getting aware of such event and redirect it to the lowcomms
layer. Some functionality which is currently handled inside the start
functionality of midcomms and lowcomms should be handled in the init
functionality as it only need to be initialized once when dlm is loaded.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lockspace.c | 5 ++---
fs/dlm/lowcomms.c | 16 ++++++++++------
fs/dlm/lowcomms.h | 1 +
fs/dlm/main.c | 7 +++++--
fs/dlm/midcomms.c | 17 ++++++++++++++++-
fs/dlm/midcomms.h | 3 +++
6 files changed, 37 insertions(+), 12 deletions(-)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 96fe8f7ca09c..d0b4e2181a5f 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -17,7 +17,6 @@
#include "recoverd.h"
#include "dir.h"
#include "midcomms.h"
-#include "lowcomms.h"
#include "config.h"
#include "memory.h"
#include "lock.h"
@@ -723,7 +722,7 @@ static int __dlm_new_lockspace(const char *name, const char *cluster,
if (!ls_count) {
dlm_scand_stop();
dlm_midcomms_shutdown();
- dlm_lowcomms_stop();
+ dlm_midcomms_stop();
}
out:
mutex_unlock(&ls_lock);
@@ -926,7 +925,7 @@ int dlm_release_lockspace(void *lockspace, int force)
if (!error)
ls_count--;
if (!ls_count)
- dlm_lowcomms_stop();
+ dlm_midcomms_stop();
mutex_unlock(&ls_lock);
return error;
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index b05c6d9b5102..0dcb69cc456a 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1987,10 +1987,6 @@ static const struct dlm_proto_ops dlm_sctp_ops = {
int dlm_lowcomms_start(void)
{
int error = -EINVAL;
- int i;
-
- for (i = 0; i < CONN_HASH_SIZE; i++)
- INIT_HLIST_HEAD(&connection_hash[i]);
init_local();
if (!dlm_local_count) {
@@ -1999,8 +1995,6 @@ int dlm_lowcomms_start(void)
goto fail;
}
- INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
-
error = work_start();
if (error)
goto fail_local;
@@ -2039,6 +2033,16 @@ int dlm_lowcomms_start(void)
return error;
}
+void dlm_lowcomms_init(void)
+{
+ int i;
+
+ for (i = 0; i < CONN_HASH_SIZE; i++)
+ INIT_HLIST_HEAD(&connection_hash[i]);
+
+ INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
+}
+
void dlm_lowcomms_exit(void)
{
struct dlm_node_addr *na, *safe;
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 29369feea991..bbce7a18416d 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -35,6 +35,7 @@ extern int dlm_allow_conn;
int dlm_lowcomms_start(void);
void dlm_lowcomms_shutdown(void);
void dlm_lowcomms_stop(void);
+void dlm_lowcomms_init(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index 1c5be4b70ac1..a77338be3237 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -17,7 +17,7 @@
#include "user.h"
#include "memory.h"
#include "config.h"
-#include "lowcomms.h"
+#include "midcomms.h"
#define CREATE_TRACE_POINTS
#include <trace/events/dlm.h>
@@ -30,6 +30,8 @@ static int __init init_dlm(void)
if (error)
goto out;
+ dlm_midcomms_init();
+
error = dlm_lockspace_init();
if (error)
goto out_mem;
@@ -66,6 +68,7 @@ static int __init init_dlm(void)
out_lockspace:
dlm_lockspace_exit();
out_mem:
+ dlm_midcomms_exit();
dlm_memory_exit();
out:
return error;
@@ -79,7 +82,7 @@ static void __exit exit_dlm(void)
dlm_config_exit();
dlm_memory_exit();
dlm_lockspace_exit();
- dlm_lowcomms_exit();
+ dlm_midcomms_exit();
dlm_unregister_debugfs();
}
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 960def5ab530..6b4c72fb0171 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -1205,13 +1205,28 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh,
#endif
int dlm_midcomms_start(void)
+{
+ return dlm_lowcomms_start();
+}
+
+void dlm_midcomms_stop(void)
+{
+ dlm_lowcomms_stop();
+}
+
+void dlm_midcomms_init(void)
{
int i;
for (i = 0; i < CONN_HASH_SIZE; i++)
INIT_HLIST_HEAD(&node_hash[i]);
- return dlm_lowcomms_start();
+ dlm_lowcomms_init();
+}
+
+void dlm_midcomms_exit(void)
+{
+ dlm_lowcomms_exit();
}
static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index d6286e80b077..69296552d5ad 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -21,6 +21,9 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen);
int dlm_midcomms_close(int nodeid);
int dlm_midcomms_start(void);
+void dlm_midcomms_stop(void);
+void dlm_midcomms_init(void);
+void dlm_midcomms_exit(void);
void dlm_midcomms_shutdown(void);
void dlm_midcomms_add_member(int nodeid);
void dlm_midcomms_remove_member(int nodeid);
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 08/18] fs: dlm: remove twice INIT_WORK
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (5 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 07/18] fs: dlm: add midcomms init/start functions Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 09/18] fs: dlm: use list_first_entry_or_null Alexander Aring
` (9 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch removed a twice INIT_WORK() functionality. We already doing
this inside of dlm_lowcomms_init() functionality which is called only
once dlm is loaded.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 1 -
1 file changed, 1 deletion(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 0dcb69cc456a..3106e7f87344 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1825,7 +1825,6 @@ static int dlm_listen_for_all(void)
save_listen_callbacks(sock);
add_listen_sock(sock, &listen_con);
- INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
result = sock->ops->listen(sock, 5);
if (result < 0) {
dlm_close_sock(&listen_con.sock);
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 09/18] fs: dlm: use list_first_entry_or_null
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (6 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 08/18] fs: dlm: remove twice INIT_WORK Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 10/18] fs: dlm: use listen sock as dlm running indicator Alexander Aring
` (8 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
Instead of check on list_empty() we can do the same with
list_first_entry_or_null() and return NULL if the returned value is NULL.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 3106e7f87344..d3302b10b37e 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -214,15 +214,12 @@ static struct writequeue_entry *con_next_wq(struct connection *con)
{
struct writequeue_entry *e;
- if (list_empty(&con->writequeue))
- return NULL;
-
- e = list_first_entry(&con->writequeue, struct writequeue_entry,
- list);
+ e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
+ list);
/* if len is zero nothing is to send, if there are users filling
* buffers we wait until the users are done so we can send more.
*/
- if (e->users || e->len == 0)
+ if (!e || e->users || e->len == 0)
return NULL;
return e;
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 10/18] fs: dlm: use listen sock as dlm running indicator
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (7 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 09/18] fs: dlm: use list_first_entry_or_null Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 11/18] fs: dlm: remove socket shutdown handling Alexander Aring
` (7 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch will switch from dlm_allow_conn to check if dlm lowcomms is
running or not to if we actually have a listen socket set or not. The
list socket will be set and unset in lowcomms start and shutdown
functionality. To synchronize with data_ready() callback we will set the
socket callback to NULL while socket lock is held.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/config.c | 4 ++--
fs/dlm/lowcomms.c | 17 ++++++-----------
fs/dlm/lowcomms.h | 4 ++--
3 files changed, 10 insertions(+), 15 deletions(-)
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index ac8b62106ce0..20b60709eccf 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -183,7 +183,7 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x)
return -EINVAL;
}
- if (dlm_allow_conn)
+ if (dlm_lowcomms_is_running())
return -EBUSY;
return 0;
@@ -194,7 +194,7 @@ static int dlm_check_zero_and_dlm_running(unsigned int x)
if (!x)
return -EINVAL;
- if (dlm_allow_conn)
+ if (dlm_lowcomms_is_running())
return -EBUSY;
return 0;
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index d3302b10b37e..d9001d40aa6e 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -176,7 +176,6 @@ static DEFINE_SPINLOCK(dlm_node_addrs_spin);
static struct listen_connection listen_con;
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
-int dlm_allow_conn;
/* Work queues */
static struct workqueue_struct *recv_workqueue;
@@ -191,6 +190,11 @@ static const struct dlm_proto_ops *dlm_proto_ops;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
+bool dlm_lowcomms_is_running(void)
+{
+ return !!listen_con.sock;
+}
+
static void writequeue_entry_ctor(void *data)
{
struct writequeue_entry *entry = data;
@@ -511,9 +515,6 @@ static void lowcomms_data_ready(struct sock *sk)
static void lowcomms_listen_data_ready(struct sock *sk)
{
- if (!dlm_allow_conn)
- return;
-
queue_work(recv_workqueue, &listen_con.rwork);
}
@@ -1689,10 +1690,7 @@ void dlm_lowcomms_shutdown(void)
{
int idx;
- /* Set all the flags to prevent any
- * socket activity.
- */
- dlm_allow_conn = 0;
+ restore_callbacks(listen_con.sock);
if (recv_workqueue)
flush_workqueue(recv_workqueue);
@@ -1995,8 +1993,6 @@ int dlm_lowcomms_start(void)
if (error)
goto fail_local;
- dlm_allow_conn = 1;
-
/* Start listening */
switch (dlm_config.ci_protocol) {
case DLM_PROTO_TCP:
@@ -2021,7 +2017,6 @@ int dlm_lowcomms_start(void)
fail_listen:
dlm_proto_ops = NULL;
fail_proto_ops:
- dlm_allow_conn = 0;
work_stop();
fail_local:
deinit_local();
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index bbce7a18416d..acaf1b0b3885 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -29,8 +29,8 @@ static inline int nodeid_hash(int nodeid)
return nodeid & (CONN_HASH_SIZE-1);
}
-/* switch to check if dlm is running */
-extern int dlm_allow_conn;
+/* check if dlm is running */
+bool dlm_lowcomms_is_running(void);
int dlm_lowcomms_start(void);
void dlm_lowcomms_shutdown(void);
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 11/18] fs: dlm: remove socket shutdown handling
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (8 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 10/18] fs: dlm: use listen sock as dlm running indicator Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 12/18] fs: dlm: cleanup listen sock handling Alexander Aring
` (6 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
Since commit 489d8e559c65 ("fs: dlm: add reliable connection if
reconnect") we have functionality like TCP offers for half-closed
sockets on dlm application protocol layer. This feature is required
because the cluster manager events about leaving resource memberships
can be locally already occurred but other cluster nodes having a pending
leaving membership over the cluster manager protocol happening. In this
time the local dlm node already shutdown it's connection and don't
transmit anymore any new dlm messages, but however it still needs to be
able to accept dlm messages because the pending leave membership request
of the cluster manager protocol which the dlm kernel implementation has
no control about it.
We have this functionality on the application for two reasons, the main
reason is that SCTP does not support such functionality on socket
layer. But we can do it inside application layer.
Another small issue is that this feature is broken in the TCP world
because some NAT devices does not implement such functionality
correctly. This is the same reason why the reliable connection session
layer in DLM exists. We give up on middle devices in the networking
which sends e.g. TCP resets out. In DLM we cannot have any message
dropping and we ensure it over a session layer that it can't happen.
Back to the half-closed grace shutdown handling. It's not necessary
anymore to do it on socket layer (which is only support for TCP sockets)
because we do it on application layer. This patch removes this handling,
if there are still issues then we have a problem on the application
layer for such handling.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 127 ++++++++--------------------------------------
fs/dlm/lowcomms.h | 1 +
fs/dlm/midcomms.c | 6 ++-
3 files changed, 27 insertions(+), 107 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index d9001d40aa6e..e33bee1beba2 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -65,7 +65,6 @@
/* Number of messages to send before rescheduling */
#define MAX_SEND_MSG_COUNT 25
-#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct connection {
struct socket *sock; /* NULL if not connected */
@@ -79,14 +78,11 @@ struct connection {
#define CF_CLOSE 6
#define CF_APP_LIMITED 7
#define CF_CLOSING 8
-#define CF_SHUTDOWN 9
-#define CF_CONNECTED 10
-#define CF_RECONNECT 11
-#define CF_DELAY_CONNECT 12
-#define CF_EOF 13
+#define CF_CONNECTED 9
+#define CF_RECONNECT 10
+#define CF_DELAY_CONNECT 11
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
- atomic_t writequeue_cnt;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@@ -94,7 +90,6 @@ struct connection {
struct connection *sendcon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
- wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
unsigned char *rx_buf;
int rx_buflen;
int rx_leftover;
@@ -157,10 +152,6 @@ struct dlm_proto_ops {
int (*listen_validate)(void);
void (*listen_sockopts)(struct socket *sock);
int (*listen_bind)(struct socket *sock);
- /* What to do to shutdown */
- void (*shutdown_action)(struct connection *con);
- /* What to do to eof check */
- bool (*eof_condition)(struct connection *con);
};
static struct listen_sock_callbacks {
@@ -241,11 +232,6 @@ static struct connection *__find_con(int nodeid, int r)
return NULL;
}
-static bool tcp_eof_condition(struct connection *con)
-{
- return atomic_read(&con->writequeue_cnt);
-}
-
static int dlm_con_init(struct connection *con, int nodeid)
{
con->rx_buflen = dlm_config.ci_buffer_size;
@@ -257,10 +243,8 @@ static int dlm_con_init(struct connection *con, int nodeid)
mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue);
spin_lock_init(&con->writequeue_lock);
- atomic_set(&con->writequeue_cnt, 0);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
- init_waitqueue_head(&con->shutdown_wait);
return 0;
}
@@ -771,7 +755,6 @@ static void free_entry(struct writequeue_entry *e)
}
list_del(&e->list);
- atomic_dec(&e->con->writequeue_cnt);
kref_put(&e->ref, dlm_page_release);
}
@@ -834,56 +817,10 @@ static void close_connection(struct connection *con, bool and_other,
clear_bit(CF_CONNECTED, &con->flags);
clear_bit(CF_DELAY_CONNECT, &con->flags);
clear_bit(CF_RECONNECT, &con->flags);
- clear_bit(CF_EOF, &con->flags);
mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
}
-static void shutdown_connection(struct connection *con)
-{
- int ret;
-
- flush_work(&con->swork);
-
- mutex_lock(&con->sock_mutex);
- /* nothing to shutdown */
- if (!con->sock) {
- mutex_unlock(&con->sock_mutex);
- return;
- }
-
- set_bit(CF_SHUTDOWN, &con->flags);
- ret = kernel_sock_shutdown(con->sock, SHUT_WR);
- mutex_unlock(&con->sock_mutex);
- if (ret) {
- log_print("Connection %p failed to shutdown: %d will force close",
- con, ret);
- goto force_close;
- } else {
- ret = wait_event_timeout(con->shutdown_wait,
- !test_bit(CF_SHUTDOWN, &con->flags),
- DLM_SHUTDOWN_WAIT_TIMEOUT);
- if (ret == 0) {
- log_print("Connection %p shutdown timed out, will force close",
- con);
- goto force_close;
- }
- }
-
- return;
-
-force_close:
- clear_bit(CF_SHUTDOWN, &con->flags);
- close_connection(con, false, true, true);
-}
-
-static void dlm_tcp_shutdown(struct connection *con)
-{
- if (con->othercon)
- shutdown_connection(con->othercon);
- shutdown_connection(con);
-}
-
static int con_realloc_receive_buf(struct connection *con, int newlen)
{
unsigned char *newbuf;
@@ -975,19 +912,8 @@ static int receive_from_sock(struct connection *con)
log_print("connection %p got EOF from %d",
con, con->nodeid);
- if (dlm_proto_ops->eof_condition &&
- dlm_proto_ops->eof_condition(con)) {
- set_bit(CF_EOF, &con->flags);
- mutex_unlock(&con->sock_mutex);
- } else {
- mutex_unlock(&con->sock_mutex);
- close_connection(con, false, true, false);
-
- /* handling for tcp shutdown */
- clear_bit(CF_SHUTDOWN, &con->flags);
- wake_up(&con->shutdown_wait);
- }
-
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, true, false);
/* signal to breaking receive worker */
ret = -1;
} else {
@@ -1261,7 +1187,6 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
- atomic_inc(&con->writequeue_cnt);
if (cb)
cb(data);
@@ -1467,20 +1392,6 @@ static void send_to_sock(struct connection *con)
}
spin_unlock(&con->writequeue_lock);
- /* close if we got EOF */
- if (test_and_clear_bit(CF_EOF, &con->flags)) {
- mutex_unlock(&con->sock_mutex);
- close_connection(con, false, false, true);
-
- /* handling for tcp shutdown */
- clear_bit(CF_SHUTDOWN, &con->flags);
- wake_up(&con->shutdown_wait);
- } else {
- mutex_unlock(&con->sock_mutex);
- }
-
- return;
-
out:
mutex_unlock(&con->sock_mutex);
return;
@@ -1680,16 +1591,8 @@ static int work_start(void)
return 0;
}
-static void shutdown_conn(struct connection *con)
-{
- if (dlm_proto_ops->shutdown_action)
- dlm_proto_ops->shutdown_action(con);
-}
-
void dlm_lowcomms_shutdown(void)
{
- int idx;
-
restore_callbacks(listen_con.sock);
if (recv_workqueue)
@@ -1698,9 +1601,25 @@ void dlm_lowcomms_shutdown(void)
flush_workqueue(send_workqueue);
dlm_close_sock(&listen_con.sock);
+}
+
+void dlm_lowcomms_shutdown_node(int nodeid, bool force)
+{
+ struct connection *con;
+ int idx;
idx = srcu_read_lock(&connections_srcu);
- foreach_conn(shutdown_conn);
+ con = nodeid2con(nodeid, 0);
+ if (WARN_ON_ONCE(!con)) {
+ srcu_read_unlock(&connections_srcu, idx);
+ return;
+ }
+
+ WARN_ON_ONCE(!force && !list_empty(&con->writequeue));
+ clean_one_writequeue(con);
+ if (con->othercon)
+ clean_one_writequeue(con->othercon);
+ close_connection(con, true, true, true);
srcu_read_unlock(&connections_srcu, idx);
}
@@ -1912,8 +1831,6 @@ static const struct dlm_proto_ops dlm_tcp_ops = {
.listen_validate = dlm_tcp_listen_validate,
.listen_sockopts = dlm_tcp_listen_sockopts,
.listen_bind = dlm_tcp_listen_bind,
- .shutdown_action = dlm_tcp_shutdown,
- .eof_condition = tcp_eof_condition,
};
static int dlm_sctp_bind(struct socket *sock)
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index acaf1b0b3885..3e8dca66183b 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -34,6 +34,7 @@ bool dlm_lowcomms_is_running(void);
int dlm_lowcomms_start(void);
void dlm_lowcomms_shutdown(void);
+void dlm_lowcomms_shutdown_node(int nodeid, bool force);
void dlm_lowcomms_stop(void);
void dlm_lowcomms_init(void);
void dlm_lowcomms_exit(void);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 6b4c72fb0171..b0e8bdcaab1b 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -1426,11 +1426,13 @@ static void midcomms_shutdown(struct midcomms_node *node)
pr_debug("active shutdown timed out for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
midcomms_node_reset(node);
+ dlm_lowcomms_shutdown_node(node->nodeid, true);
return;
}
pr_debug("active shutdown done for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
+ dlm_lowcomms_shutdown_node(node->nodeid, false);
}
void dlm_midcomms_shutdown(void)
@@ -1438,6 +1440,8 @@ void dlm_midcomms_shutdown(void)
struct midcomms_node *node;
int i, idx;
+ dlm_lowcomms_shutdown();
+
mutex_lock(&close_lock);
idx = srcu_read_lock(&nodes_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
@@ -1455,8 +1459,6 @@ void dlm_midcomms_shutdown(void)
}
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
-
- dlm_lowcomms_shutdown();
}
int dlm_midcomms_close(int nodeid)
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 12/18] fs: dlm: cleanup listen sock handling
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (9 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 11/18] fs: dlm: remove socket shutdown handling Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 13/18] fs: dlm: don't put dlm_local_addrs on heap Alexander Aring
` (5 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch removes save_listen_callbacks() and add_listen_sock() as they
are only used once in lowcomms functionality. For shutdown lowcomms it's
not necessary to whole flush the workqueues to synchronize with
restoring the old sk_data_ready() callback. Only the listen con receive
work need to be cancelled. For each individual node shutdown we should be
sure that last ack was been transmitted which is done by flushing per
connection swork worker.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 51 ++++++++++++++++-------------------------------
1 file changed, 17 insertions(+), 34 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index e33bee1beba2..8bf09c3a0946 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -647,17 +647,6 @@ static void lowcomms_error_report(struct sock *sk)
orig_report(sk);
}
-/* Note: sk_callback_lock must be locked before calling this function. */
-static void save_listen_callbacks(struct socket *sock)
-{
- struct sock *sk = sock->sk;
-
- listen_sock.sk_data_ready = sk->sk_data_ready;
- listen_sock.sk_state_change = sk->sk_state_change;
- listen_sock.sk_write_space = sk->sk_write_space;
- listen_sock.sk_error_report = sk->sk_error_report;
-}
-
static void restore_callbacks(struct socket *sock)
{
struct sock *sk = sock->sk;
@@ -671,21 +660,6 @@ static void restore_callbacks(struct socket *sock)
release_sock(sk);
}
-static void add_listen_sock(struct socket *sock, struct listen_connection *con)
-{
- struct sock *sk = sock->sk;
-
- lock_sock(sk);
- save_listen_callbacks(sock);
- con->sock = sock;
-
- sk->sk_user_data = con;
- sk->sk_allocation = GFP_NOFS;
- /* Install a data_ready callback */
- sk->sk_data_ready = lowcomms_listen_data_ready;
- release_sock(sk);
-}
-
/* Make a socket active */
static void add_sock(struct socket *sock, struct connection *con)
{
@@ -1593,13 +1567,12 @@ static int work_start(void)
void dlm_lowcomms_shutdown(void)
{
- restore_callbacks(listen_con.sock);
-
- if (recv_workqueue)
- flush_workqueue(recv_workqueue);
- if (send_workqueue)
- flush_workqueue(send_workqueue);
+ /* stop lowcomms_listen_data_ready calls */
+ lock_sock(listen_con.sock->sk);
+ listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
+ release_sock(listen_con.sock->sk);
+ cancel_work_sync(&listen_con.rwork);
dlm_close_sock(&listen_con.sock);
}
@@ -1615,6 +1588,7 @@ void dlm_lowcomms_shutdown_node(int nodeid, bool force)
return;
}
+ flush_work(&con->swork);
WARN_ON_ONCE(!force && !list_empty(&con->writequeue));
clean_one_writequeue(con);
if (con->othercon)
@@ -1736,8 +1710,17 @@ static int dlm_listen_for_all(void)
if (result < 0)
goto out;
- save_listen_callbacks(sock);
- add_listen_sock(sock, &listen_con);
+ lock_sock(sock->sk);
+ listen_sock.sk_data_ready = sock->sk->sk_data_ready;
+ listen_sock.sk_write_space = sock->sk->sk_write_space;
+ listen_sock.sk_error_report = sock->sk->sk_error_report;
+ listen_sock.sk_state_change = sock->sk->sk_state_change;
+
+ listen_con.sock = sock;
+
+ sock->sk->sk_allocation = GFP_NOFS;
+ sock->sk->sk_data_ready = lowcomms_listen_data_ready;
+ release_sock(sock->sk);
result = sock->ops->listen(sock, 5);
if (result < 0) {
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 13/18] fs: dlm: don't put dlm_local_addrs on heap
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (10 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 12/18] fs: dlm: cleanup listen sock handling Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 14/18] fs: dlm: remove dlm_node_addrs lookup list Alexander Aring
` (4 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch removes to allocate the dlm_local_addr[] pointers on the
heap. Instead we directly store the type of "struct sockaddr_storage".
This removes function deinit_local() because it was freeing memory only.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 38 ++++++++++++--------------------------
1 file changed, 12 insertions(+), 26 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 8bf09c3a0946..0883394cfbeb 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -165,7 +165,7 @@ static LIST_HEAD(dlm_node_addrs);
static DEFINE_SPINLOCK(dlm_node_addrs_spin);
static struct listen_connection listen_con;
-static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
+static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
/* Work queues */
@@ -383,7 +383,7 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
if (!sa_out)
return 0;
- if (dlm_local_addr[0]->ss_family == AF_INET) {
+ if (dlm_local_addr[0].ss_family == AF_INET) {
struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
@@ -683,7 +683,7 @@ static void add_sock(struct socket *sock, struct connection *con)
static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
int *addr_len)
{
- saddr->ss_family = dlm_local_addr[0]->ss_family;
+ saddr->ss_family = dlm_local_addr[0].ss_family;
if (saddr->ss_family == AF_INET) {
struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
in4_addr->sin_port = cpu_to_be16(port);
@@ -1065,7 +1065,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
int i, addr_len, result = 0;
for (i = 0; i < dlm_local_count; i++) {
- memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
+ memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
make_sockaddr(&localaddr, port, &addr_len);
if (!i)
@@ -1085,7 +1085,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
/* Get local addresses */
static void init_local(void)
{
- struct sockaddr_storage sas, *addr;
+ struct sockaddr_storage sas;
int i;
dlm_local_count = 0;
@@ -1093,21 +1093,10 @@ static void init_local(void)
if (dlm_our_addr(&sas, i))
break;
- addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
- if (!addr)
- break;
- dlm_local_addr[dlm_local_count++] = addr;
+ memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
}
}
-static void deinit_local(void)
-{
- int i;
-
- for (i = 0; i < dlm_local_count; i++)
- kfree(dlm_local_addr[i]);
-}
-
static struct writequeue_entry *new_writequeue_entry(struct connection *con)
{
struct writequeue_entry *entry;
@@ -1463,7 +1452,7 @@ static void dlm_connect(struct connection *con)
}
/* Create a socket to communicate with */
- result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+ result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
SOCK_STREAM, dlm_proto_ops->proto, &sock);
if (result < 0)
goto socket_err;
@@ -1679,7 +1668,6 @@ void dlm_lowcomms_stop(void)
foreach_conn(free_conn);
srcu_read_unlock(&connections_srcu, idx);
work_stop();
- deinit_local();
dlm_proto_ops = NULL;
}
@@ -1696,7 +1684,7 @@ static int dlm_listen_for_all(void)
if (result < 0)
return result;
- result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+ result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
SOCK_STREAM, dlm_proto_ops->proto, &sock);
if (result < 0) {
log_print("Can't create comms socket: %d", result);
@@ -1743,7 +1731,7 @@ static int dlm_tcp_bind(struct socket *sock)
/* Bind to our cluster-known address connecting to avoid
* routing problems.
*/
- memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
+ memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
make_sockaddr(&src_addr, 0, &addr_len);
result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
@@ -1800,8 +1788,8 @@ static int dlm_tcp_listen_bind(struct socket *sock)
int addr_len;
/* Bind to our port */
- make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
- return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
+ make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
+ return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0],
addr_len);
}
@@ -1891,7 +1879,7 @@ int dlm_lowcomms_start(void)
error = work_start();
if (error)
- goto fail_local;
+ goto fail;
/* Start listening */
switch (dlm_config.ci_protocol) {
@@ -1918,8 +1906,6 @@ int dlm_lowcomms_start(void)
dlm_proto_ops = NULL;
fail_proto_ops:
work_stop();
-fail_local:
- deinit_local();
fail:
return error;
}
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 14/18] fs: dlm: remove dlm_node_addrs lookup list
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (11 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 13/18] fs: dlm: don't put dlm_local_addrs on heap Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 15/18] fs: dlm: use sock2con without checking null Alexander Aring
` (3 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch merges the dlm_node_addrs lookup list to the connection
structure. It is a per node mapping to some configuration setup by
configfs. We don't need two lookup structures. The connection hash has
now a lifetime like the dlm_node_addrs entries. Means we add only new
entries when configure cluster and not while new connections are coming
in, remove connection when a node got fenced and cleanup all connection
when the dlm exits. It should work the same and even will show more
issues because we don't try to somehow keep those two data structures in
sync with the current cluster configuration.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 290 ++++++++++++++++++++++------------------------
1 file changed, 136 insertions(+), 154 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 0883394cfbeb..ed3cd3757199 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -93,6 +93,11 @@ struct connection {
unsigned char *rx_buf;
int rx_buflen;
int rx_leftover;
+ int mark;
+ int addr_count;
+ int curr_addr_index;
+ struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
+ spinlock_t addrs_lock;
struct rcu_head rcu;
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -131,15 +136,6 @@ struct dlm_msg {
struct kref ref;
};
-struct dlm_node_addr {
- struct list_head list;
- int nodeid;
- int mark;
- int addr_count;
- int curr_addr_index;
- struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
-};
-
struct dlm_proto_ops {
bool try_new_addr;
const char *name;
@@ -161,9 +157,6 @@ static struct listen_sock_callbacks {
void (*sk_write_space)(struct sock *);
} listen_sock;
-static LIST_HEAD(dlm_node_addrs);
-static DEFINE_SPINLOCK(dlm_node_addrs_spin);
-
static struct listen_connection listen_con;
static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
@@ -306,17 +299,6 @@ static void foreach_conn(void (*conn_func)(struct connection *c))
}
}
-static struct dlm_node_addr *find_node_addr(int nodeid)
-{
- struct dlm_node_addr *na;
-
- list_for_each_entry(na, &dlm_node_addrs, list) {
- if (na->nodeid == nodeid)
- return na;
- }
- return NULL;
-}
-
static int addr_compare(const struct sockaddr_storage *x,
const struct sockaddr_storage *y)
{
@@ -350,38 +332,45 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
unsigned int *mark)
{
struct sockaddr_storage sas;
- struct dlm_node_addr *na;
+ struct connection *con;
+ int idx;
if (!dlm_local_count)
return -1;
- spin_lock(&dlm_node_addrs_spin);
- na = find_node_addr(nodeid);
- if (na && na->addr_count) {
- memcpy(&sas, na->addr[na->curr_addr_index],
- sizeof(struct sockaddr_storage));
+ idx = srcu_read_lock(&connections_srcu);
+ con = nodeid2con(nodeid, 0);
+ if (!con) {
+ srcu_read_unlock(&connections_srcu, idx);
+ return -ENOENT;
+ }
- if (try_new_addr) {
- na->curr_addr_index++;
- if (na->curr_addr_index == na->addr_count)
- na->curr_addr_index = 0;
- }
+ spin_lock(&con->addrs_lock);
+ if (!con->addr_count) {
+ spin_unlock(&con->addrs_lock);
+ srcu_read_unlock(&connections_srcu, idx);
+ return -ENOENT;
}
- spin_unlock(&dlm_node_addrs_spin);
- if (!na)
- return -EEXIST;
+ memcpy(&sas, &con->addr[con->curr_addr_index],
+ sizeof(struct sockaddr_storage));
- if (!na->addr_count)
- return -ENOENT;
+ if (try_new_addr) {
+ con->curr_addr_index++;
+ if (con->curr_addr_index == con->addr_count)
+ con->curr_addr_index = 0;
+ }
- *mark = na->mark;
+ *mark = con->mark;
+ spin_unlock(&con->addrs_lock);
if (sas_out)
memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
- if (!sa_out)
+ if (!sa_out) {
+ srcu_read_unlock(&connections_srcu, idx);
return 0;
+ }
if (dlm_local_addr[0].ss_family == AF_INET) {
struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
@@ -393,43 +382,46 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
ret6->sin6_addr = in6->sin6_addr;
}
+ srcu_read_unlock(&connections_srcu, idx);
return 0;
}
static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
unsigned int *mark)
{
- struct dlm_node_addr *na;
- int rv = -EEXIST;
- int addr_i;
-
- spin_lock(&dlm_node_addrs_spin);
- list_for_each_entry(na, &dlm_node_addrs, list) {
- if (!na->addr_count)
- continue;
-
- for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
- if (addr_compare(na->addr[addr_i], addr)) {
- *nodeid = na->nodeid;
- *mark = na->mark;
- rv = 0;
- goto unlock;
+ struct connection *con;
+ int i, idx, addr_i;
+
+ idx = srcu_read_lock(&connections_srcu);
+ for (i = 0; i < CONN_HASH_SIZE; i++) {
+ hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
+ WARN_ON_ONCE(!con->addr_count);
+
+ spin_lock(&con->addrs_lock);
+ for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
+ if (addr_compare(&con->addr[addr_i], addr)) {
+ *nodeid = con->nodeid;
+ *mark = con->mark;
+ spin_unlock(&con->addrs_lock);
+ srcu_read_unlock(&connections_srcu, idx);
+ return 0;
+ }
}
+ spin_unlock(&con->addrs_lock);
}
}
-unlock:
- spin_unlock(&dlm_node_addrs_spin);
- return rv;
+ srcu_read_unlock(&connections_srcu, idx);
+
+ return -ENOENT;
}
-/* caller need to held dlm_node_addrs_spin lock */
-static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
- const struct sockaddr_storage *addr)
+static bool dlm_lowcomms_con_has_addr(const struct connection *con,
+ const struct sockaddr_storage *addr)
{
int i;
- for (i = 0; i < na->addr_count; i++) {
- if (addr_compare(na->addr[i], addr))
+ for (i = 0; i < con->addr_count; i++) {
+ if (addr_compare(&con->addr[i], addr))
return true;
}
@@ -438,52 +430,42 @@ static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
{
- struct sockaddr_storage *new_addr;
- struct dlm_node_addr *new_node, *na;
- bool ret;
-
- new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
- if (!new_node)
- return -ENOMEM;
+ struct connection *con;
+ bool ret, idx;
- new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
- if (!new_addr) {
- kfree(new_node);
+ idx = srcu_read_lock(&connections_srcu);
+ con = nodeid2con(nodeid, GFP_NOFS);
+ if (!con) {
+ srcu_read_unlock(&connections_srcu, idx);
return -ENOMEM;
}
- memcpy(new_addr, addr, len);
-
- spin_lock(&dlm_node_addrs_spin);
- na = find_node_addr(nodeid);
- if (!na) {
- new_node->nodeid = nodeid;
- new_node->addr[0] = new_addr;
- new_node->addr_count = 1;
- new_node->mark = dlm_config.ci_mark;
- list_add(&new_node->list, &dlm_node_addrs);
- spin_unlock(&dlm_node_addrs_spin);
+ spin_lock(&con->addrs_lock);
+ if (!con->addr_count) {
+ memcpy(&con->addr[0], addr, sizeof(*addr));
+ con->addr_count = 1;
+ con->mark = dlm_config.ci_mark;
+ spin_unlock(&con->addrs_lock);
+ srcu_read_unlock(&connections_srcu, idx);
return 0;
}
- ret = dlm_lowcomms_na_has_addr(na, addr);
+ ret = dlm_lowcomms_con_has_addr(con, addr);
if (ret) {
- spin_unlock(&dlm_node_addrs_spin);
- kfree(new_addr);
- kfree(new_node);
+ spin_unlock(&con->addrs_lock);
+ srcu_read_unlock(&connections_srcu, idx);
return -EEXIST;
}
- if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
- spin_unlock(&dlm_node_addrs_spin);
- kfree(new_addr);
- kfree(new_node);
+ if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
+ spin_unlock(&con->addrs_lock);
+ srcu_read_unlock(&connections_srcu, idx);
return -ENOSPC;
}
- na->addr[na->addr_count++] = new_addr;
- spin_unlock(&dlm_node_addrs_spin);
- kfree(new_node);
+ memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
+ srcu_read_unlock(&connections_srcu, idx);
+ spin_unlock(&con->addrs_lock);
return 0;
}
@@ -558,10 +540,10 @@ int dlm_lowcomms_connect_node(int nodeid)
return 0;
idx = srcu_read_lock(&connections_srcu);
- con = nodeid2con(nodeid, GFP_NOFS);
- if (!con) {
+ con = nodeid2con(nodeid, 0);
+ if (WARN_ON_ONCE(!con)) {
srcu_read_unlock(&connections_srcu, idx);
- return -ENOMEM;
+ return -ENOENT;
}
lowcomms_connect_sock(con);
@@ -572,18 +554,20 @@ int dlm_lowcomms_connect_node(int nodeid)
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
{
- struct dlm_node_addr *na;
+ struct connection *con;
+ int idx;
- spin_lock(&dlm_node_addrs_spin);
- na = find_node_addr(nodeid);
- if (!na) {
- spin_unlock(&dlm_node_addrs_spin);
+ idx = srcu_read_lock(&connections_srcu);
+ con = nodeid2con(nodeid, 0);
+ if (!con) {
+ srcu_read_unlock(&connections_srcu, idx);
return -ENOENT;
}
- na->mark = mark;
- spin_unlock(&dlm_node_addrs_spin);
-
+ spin_lock(&con->addrs_lock);
+ con->mark = mark;
+ spin_unlock(&con->addrs_lock);
+ srcu_read_unlock(&connections_srcu, idx);
return 0;
}
@@ -960,10 +944,10 @@ static int accept_from_sock(struct listen_connection *con)
* In this case we store the incoming one in "othercon"
*/
idx = srcu_read_lock(&connections_srcu);
- newcon = nodeid2con(nodeid, GFP_NOFS);
- if (!newcon) {
+ newcon = nodeid2con(nodeid, 0);
+ if (WARN_ON_ONCE(!newcon)) {
srcu_read_unlock(&connections_srcu, idx);
- result = -ENOMEM;
+ result = -ENOENT;
goto accept_err;
}
@@ -1210,8 +1194,8 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
}
idx = srcu_read_lock(&connections_srcu);
- con = nodeid2con(nodeid, allocation);
- if (!con) {
+ con = nodeid2con(nodeid, 0);
+ if (WARN_ON_ONCE(!con)) {
srcu_read_unlock(&connections_srcu, idx);
return NULL;
}
@@ -1376,35 +1360,44 @@ static void clean_one_writequeue(struct connection *con)
spin_unlock(&con->writequeue_lock);
}
+static void connection_release(struct rcu_head *rcu)
+{
+ struct connection *con = container_of(rcu, struct connection, rcu);
+
+ kfree(con->rx_buf);
+ kfree(con);
+}
+
/* Called from recovery when it knows that a node has
left the cluster */
int dlm_lowcomms_close(int nodeid)
{
struct connection *con;
- struct dlm_node_addr *na;
int idx;
log_print("closing connection to node %d", nodeid);
+
idx = srcu_read_lock(&connections_srcu);
con = nodeid2con(nodeid, 0);
- if (con) {
- set_bit(CF_CLOSE, &con->flags);
- close_connection(con, true, true, true);
- clean_one_writequeue(con);
- if (con->othercon)
- clean_one_writequeue(con->othercon);
+ if (WARN_ON_ONCE(!con)) {
+ srcu_read_unlock(&connections_srcu, idx);
+ return -ENOENT;
}
- srcu_read_unlock(&connections_srcu, idx);
- spin_lock(&dlm_node_addrs_spin);
- na = find_node_addr(nodeid);
- if (na) {
- list_del(&na->list);
- while (na->addr_count--)
- kfree(na->addr[na->addr_count]);
- kfree(na);
+ spin_lock(&connections_lock);
+ hlist_del_rcu(&con->list);
+ spin_unlock(&connections_lock);
+
+ close_connection(con, true, true, true);
+
+ clean_one_writequeue(con);
+ call_srcu(&connections_srcu, &con->rcu, connection_release);
+ if (con->othercon) {
+ clean_one_writequeue(con->othercon);
+ if (con->othercon)
+ call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
}
- spin_unlock(&dlm_node_addrs_spin);
+ srcu_read_unlock(&connections_srcu, idx);
return 0;
}
@@ -1607,27 +1600,9 @@ static void stop_conn(struct connection *con)
_stop_conn(con, true);
}
-static void connection_release(struct rcu_head *rcu)
-{
- struct connection *con = container_of(rcu, struct connection, rcu);
-
- kfree(con->rx_buf);
- kfree(con);
-}
-
static void free_conn(struct connection *con)
{
close_connection(con, true, true, true);
- spin_lock(&connections_lock);
- hlist_del_rcu(&con->list);
- spin_unlock(&connections_lock);
- if (con->othercon) {
- clean_one_writequeue(con->othercon);
- call_srcu(&connections_srcu, &con->othercon->rcu,
- connection_release);
- }
- clean_one_writequeue(con);
- call_srcu(&connections_srcu, &con->rcu, connection_release);
}
static void work_flush(void)
@@ -1922,14 +1897,21 @@ void dlm_lowcomms_init(void)
void dlm_lowcomms_exit(void)
{
- struct dlm_node_addr *na, *safe;
+ struct connection *con;
+ int i, idx;
- spin_lock(&dlm_node_addrs_spin);
- list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
- list_del(&na->list);
- while (na->addr_count--)
- kfree(na->addr[na->addr_count]);
- kfree(na);
+ idx = srcu_read_lock(&connections_srcu);
+ for (i = 0; i < CONN_HASH_SIZE; i++) {
+ hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
+ spin_lock(&connections_lock);
+ hlist_del_rcu(&con->list);
+ spin_unlock(&connections_lock);
+
+ if (con->othercon)
+ call_srcu(&connections_srcu, &con->othercon->rcu,
+ connection_release);
+ call_srcu(&connections_srcu, &con->rcu, connection_release);
+ }
}
- spin_unlock(&dlm_node_addrs_spin);
+ srcu_read_unlock(&connections_srcu, idx);
}
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 15/18] fs: dlm: use sock2con without checking null
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (12 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 14/18] fs: dlm: remove dlm_node_addrs lookup list Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 16/18] fs: dlm: use saved sk_error_report() Alexander Aring
` (2 subsequent siblings)
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch removes null checks on private data for sockets. If we have a
null dereference there we having a bug in our implementation that such
callback occurs in this state.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 17 ++++-------------
1 file changed, 4 insertions(+), 13 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index ed3cd3757199..677e31144ca0 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -472,10 +472,9 @@ int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
/* Data available on socket or listen socket received a connect */
static void lowcomms_data_ready(struct sock *sk)
{
- struct connection *con;
+ struct connection *con = sock2con(sk);
- con = sock2con(sk);
- if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
+ if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
queue_work(recv_workqueue, &con->rwork);
}
@@ -486,11 +485,7 @@ static void lowcomms_listen_data_ready(struct sock *sk)
static void lowcomms_write_space(struct sock *sk)
{
- struct connection *con;
-
- con = sock2con(sk);
- if (!con)
- return;
+ struct connection *con = sock2con(sk);
if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
log_print("connected to node %d", con->nodeid);
@@ -573,14 +568,10 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
static void lowcomms_error_report(struct sock *sk)
{
- struct connection *con;
+ struct connection *con = sock2con(sk);
void (*orig_report)(struct sock *) = NULL;
struct inet_sock *inet;
- con = sock2con(sk);
- if (con == NULL)
- goto out;
-
orig_report = listen_sock.sk_error_report;
inet = inet_sk(sk);
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 16/18] fs: dlm: use saved sk_error_report()
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (13 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 15/18] fs: dlm: use sock2con without checking null Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 17/18] fs: dlm: don't init error value Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 18/18] fs: dlm: parallelize lowcomms socket handling Alexander Aring
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch changes the handling of calling the original
sk_error_report() by not putting it on the stack and calling it later.
If the listen_sock.sk_error_report() is NULL in this moment it indicates
a bug in our implementation.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 677e31144ca0..643f9810ec35 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -569,11 +569,8 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
static void lowcomms_error_report(struct sock *sk)
{
struct connection *con = sock2con(sk);
- void (*orig_report)(struct sock *) = NULL;
struct inet_sock *inet;
- orig_report = listen_sock.sk_error_report;
-
inet = inet_sk(sk);
switch (sk->sk_family) {
case AF_INET:
@@ -618,8 +615,7 @@ static void lowcomms_error_report(struct sock *sk)
queue_work(send_workqueue, &con->swork);
out:
- if (orig_report)
- orig_report(sk);
+ listen_sock.sk_error_report(sk);
}
static void restore_callbacks(struct socket *sock)
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 17/18] fs: dlm: don't init error value
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (14 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 16/18] fs: dlm: use saved sk_error_report() Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 18/18] fs: dlm: parallelize lowcomms socket handling Alexander Aring
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch removes a init of an error value to -EINVAL which is not
necessary.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 643f9810ec35..c6b91f67a2c2 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1830,7 +1830,7 @@ static const struct dlm_proto_ops dlm_sctp_ops = {
int dlm_lowcomms_start(void)
{
- int error = -EINVAL;
+ int error;
init_local();
if (!dlm_local_count) {
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
* [Cluster-devel] [PATCH dlm/next 18/18] fs: dlm: parallelize lowcomms socket handling
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
` (15 preceding siblings ...)
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 17/18] fs: dlm: don't init error value Alexander Aring
@ 2022-11-17 22:11 ` Alexander Aring
16 siblings, 0 replies; 18+ messages in thread
From: Alexander Aring @ 2022-11-17 22:11 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch is rework of lowcomms handling, the main goal was here to
handle recvmsg() and sendpage() to run parallel. Parallel in two senses:
1. per connection and 2. that recvmsg()/sendpage() doesn't block each
other.
Currently recvmsg()/sendpage() cannot run parallel because two
workqueues "dlm_recv" and "dlm_send" are ordered workqueues. That means
only one work item can be executed. The amount of queue items will be
increased about the amount of nodes being inside the cluster. The current
two workqueues for sending and receiving can also block each other if the
same connection is executed at the same time in dlm_recv and dlm_send
workqueue because a per connection mutex for the socket handling.
To make it more parallel we introduce one "dlm_io" workqueue which is
not an ordered workqueue, the amount of workers are not limited. Due
per connection flags SEND/RECV pending we schedule workers ordered per
connection and per send and receive task. To get rid of the mutex
blocking same workers to do socket handling we switched to a semaphore
which handles socket operations as read lock and sock releases as write
operations, to prevent sock_release() being called while the socket is
being used.
There might be more optimization removing the semaphore and replacing it
with other synchronization mechanism, however due other circumstances
e.g. othercon behaviour it seems complicated to doing this change. I
added comments to remove the othercon handling and moving to a different
synchronization mechanism as this is done. We need to do that to the next
dlm major version upgrade because it is not backwards compatible with the
current connect mechanism.
The processing of dlm messages need to be still handled by a ordered
workqueue. An dlm_process ordered workqueue was introduced which gets
filled by the receive worker. This is probably the next bottleneck of
DLM but the application can't currently parse dlm messages parallel. A
comment was introduced to lift the workqueue context of dlm processing
in a non-sleepable softirq to get messages processing done fast.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 1024 ++++++++++++++++++++++++---------------------
fs/dlm/midcomms.c | 45 +-
fs/dlm/midcomms.h | 1 +
3 files changed, 586 insertions(+), 484 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index c6b91f67a2c2..799d1c36eabf 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -63,35 +63,43 @@
#define NEEDED_RMEM (4*1024*1024)
-/* Number of messages to send before rescheduling */
-#define MAX_SEND_MSG_COUNT 25
-
struct connection {
struct socket *sock; /* NULL if not connected */
uint32_t nodeid; /* So we know who we are in the list */
- struct mutex sock_mutex;
+ /* this semaphore is used to allow parallel recv/send in read
+ * lock mode. When we release a sock we need to held the write lock.
+ *
+ * However this is locking code and not nice. When we remove the
+ * othercon handling we can look into other mechanism to synchronize
+ * io handling to call sock_release() at the right time.
+ */
+ struct rw_semaphore sock_lock;
unsigned long flags;
-#define CF_READ_PENDING 1
-#define CF_WRITE_PENDING 2
-#define CF_INIT_PENDING 4
+#define CF_APP_LIMITED 0
+#define CF_RECV_PENDING 1
+#define CF_SEND_PENDING 2
+#define CF_RECV_INTR 3
+#define CF_IO_STOP 4
#define CF_IS_OTHERCON 5
-#define CF_CLOSE 6
-#define CF_APP_LIMITED 7
-#define CF_CLOSING 8
-#define CF_CONNECTED 9
-#define CF_RECONNECT 10
-#define CF_DELAY_CONNECT 11
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
int retries;
-#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
+ /* due some connect()/accept() races we currently have this cross over
+ * connection attempt second connection for one node.
+ *
+ * There is a solution to avoid the race by introducing a connect
+ * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
+ * connect. Otherside can connect but will only be considered that
+ * the other side wants to have a reconnect.
+ *
+ * However changing to this behaviour will break backwards compatible.
+ * In a DLM protocol major version upgrade we should remove this!
+ */
struct connection *othercon;
- struct connection *sendcon;
- struct work_struct rwork; /* Receive workqueue */
- struct work_struct swork; /* Send workqueue */
- unsigned char *rx_buf;
- int rx_buflen;
+ struct work_struct rwork; /* receive worker */
+ struct work_struct swork; /* send worker */
+ unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
int rx_leftover;
int mark;
int addr_count;
@@ -136,6 +144,14 @@ struct dlm_msg {
struct kref ref;
};
+struct processqueue_entry {
+ unsigned char *buf;
+ int nodeid;
+ int buflen;
+
+ struct list_head list;
+};
+
struct dlm_proto_ops {
bool try_new_addr;
const char *name;
@@ -162,8 +178,8 @@ static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
/* Work queues */
-static struct workqueue_struct *recv_workqueue;
-static struct workqueue_struct *send_workqueue;
+static struct workqueue_struct *io_workqueue;
+static struct workqueue_struct *process_workqueue;
static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_SPINLOCK(connections_lock);
@@ -171,14 +187,44 @@ DEFINE_STATIC_SRCU(connections_srcu);
static const struct dlm_proto_ops *dlm_proto_ops;
+#define DLM_IO_SUCCESS 0
+#define DLM_IO_END 1
+#define DLM_IO_EOF 2
+#define DLM_IO_RESCHED 3
+
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
+static void process_dlm_messages(struct work_struct *work);
+
+static DECLARE_WORK(process_work, process_dlm_messages);
+static DEFINE_SPINLOCK(processqueue_lock);
+static bool process_dlm_messages_pending;
+static LIST_HEAD(processqueue);
bool dlm_lowcomms_is_running(void)
{
return !!listen_con.sock;
}
+static void lowcomms_queue_swork(struct connection *con)
+{
+ WARN_ON_ONCE(!lockdep_is_held(&con->writequeue_lock));
+
+ if (!test_bit(CF_IO_STOP, &con->flags) &&
+ !test_bit(CF_APP_LIMITED, &con->flags) &&
+ !test_and_set_bit(CF_SEND_PENDING, &con->flags))
+ queue_work(io_workqueue, &con->swork);
+}
+
+static void lowcomms_queue_rwork(struct connection *con)
+{
+ WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
+
+ if (!test_bit(CF_IO_STOP, &con->flags) &&
+ !test_and_set_bit(CF_RECV_PENDING, &con->flags))
+ queue_work(io_workqueue, &con->rwork);
+}
+
static void writequeue_entry_ctor(void *data)
{
struct writequeue_entry *entry = data;
@@ -225,21 +271,15 @@ static struct connection *__find_con(int nodeid, int r)
return NULL;
}
-static int dlm_con_init(struct connection *con, int nodeid)
+static void dlm_con_init(struct connection *con, int nodeid)
{
- con->rx_buflen = dlm_config.ci_buffer_size;
- con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
- if (!con->rx_buf)
- return -ENOMEM;
-
con->nodeid = nodeid;
- mutex_init(&con->sock_mutex);
+ init_rwsem(&con->sock_lock);
INIT_LIST_HEAD(&con->writequeue);
spin_lock_init(&con->writequeue_lock);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
-
- return 0;
+ spin_lock_init(&con->addrs_lock);
}
/*
@@ -249,7 +289,7 @@ static int dlm_con_init(struct connection *con, int nodeid)
static struct connection *nodeid2con(int nodeid, gfp_t alloc)
{
struct connection *con, *tmp;
- int r, ret;
+ int r;
r = nodeid_hash(nodeid);
con = __find_con(nodeid, r);
@@ -260,11 +300,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
if (!con)
return NULL;
- ret = dlm_con_init(con, nodeid);
- if (ret) {
- kfree(con);
- return NULL;
- }
+ dlm_con_init(con, nodeid);
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
@@ -276,7 +312,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
tmp = __find_con(nodeid, r);
if (tmp) {
spin_unlock(&connections_lock);
- kfree(con->rx_buf);
kfree(con);
return tmp;
}
@@ -287,18 +322,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return con;
}
-/* Loop round all connections */
-static void foreach_conn(void (*conn_func)(struct connection *c))
-{
- int i;
- struct connection *con;
-
- for (i = 0; i < CONN_HASH_SIZE; i++) {
- hlist_for_each_entry_rcu(con, &connection_hash[i], list)
- conn_func(con);
- }
-}
-
static int addr_compare(const struct sockaddr_storage *x,
const struct sockaddr_storage *y)
{
@@ -474,56 +497,38 @@ static void lowcomms_data_ready(struct sock *sk)
{
struct connection *con = sock2con(sk);
- if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
- queue_work(recv_workqueue, &con->rwork);
-}
-
-static void lowcomms_listen_data_ready(struct sock *sk)
-{
- queue_work(recv_workqueue, &listen_con.rwork);
+ set_bit(CF_RECV_INTR, &con->flags);
+ lowcomms_queue_rwork(con);
}
static void lowcomms_write_space(struct sock *sk)
{
struct connection *con = sock2con(sk);
- if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
- log_print("connected to node %d", con->nodeid);
- queue_work(send_workqueue, &con->swork);
- return;
- }
-
clear_bit(SOCK_NOSPACE, &con->sock->flags);
+ spin_lock_bh(&con->writequeue_lock);
if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
con->sock->sk->sk_write_pending--;
clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
}
- queue_work(send_workqueue, &con->swork);
-}
-
-static inline void lowcomms_connect_sock(struct connection *con)
-{
- if (test_bit(CF_CLOSE, &con->flags))
- return;
- queue_work(send_workqueue, &con->swork);
- cond_resched();
+ lowcomms_queue_swork(con);
+ spin_unlock_bh(&con->writequeue_lock);
}
static void lowcomms_state_change(struct sock *sk)
{
/* SCTP layer is not calling sk_data_ready when the connection
- * is done, so we catch the signal through here. Also, it
- * doesn't switch socket state when entering shutdown, so we
- * skip the write in that case.
+ * is done, so we catch the signal through here.
*/
- if (sk->sk_shutdown) {
- if (sk->sk_shutdown == RCV_SHUTDOWN)
- lowcomms_data_ready(sk);
- } else if (sk->sk_state == TCP_ESTABLISHED) {
- lowcomms_write_space(sk);
- }
+ if (sk->sk_shutdown == RCV_SHUTDOWN)
+ lowcomms_data_ready(sk);
+}
+
+static void lowcomms_listen_data_ready(struct sock *sk)
+{
+ queue_work(io_workqueue, &listen_con.rwork);
}
int dlm_lowcomms_connect_node(int nodeid)
@@ -541,9 +546,16 @@ int dlm_lowcomms_connect_node(int nodeid)
return -ENOENT;
}
- lowcomms_connect_sock(con);
+ down_read(&con->sock_lock);
+ if (!con->sock) {
+ spin_lock_bh(&con->writequeue_lock);
+ lowcomms_queue_swork(con);
+ spin_unlock_bh(&con->writequeue_lock);
+ }
+ up_read(&con->sock_lock);
srcu_read_unlock(&connections_srcu, idx);
+ cond_resched();
return 0;
}
@@ -596,39 +608,23 @@ static void lowcomms_error_report(struct sock *sk)
"invalid socket family %d set, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
sk->sk_family, sk->sk_err, sk->sk_err_soft);
- goto out;
- }
-
- /* below sendcon only handling */
- if (test_bit(CF_IS_OTHERCON, &con->flags))
- con = con->sendcon;
-
- switch (sk->sk_err) {
- case ECONNREFUSED:
- set_bit(CF_DELAY_CONNECT, &con->flags);
- break;
- default:
break;
}
- if (!test_and_set_bit(CF_RECONNECT, &con->flags))
- queue_work(send_workqueue, &con->swork);
+ dlm_midcomms_unack_msg_resend(con->nodeid);
-out:
listen_sock.sk_error_report(sk);
}
-static void restore_callbacks(struct socket *sock)
+static void restore_callbacks(struct sock *sk)
{
- struct sock *sk = sock->sk;
+ WARN_ON_ONCE(!lockdep_sock_is_held(sk));
- lock_sock(sk);
sk->sk_user_data = NULL;
sk->sk_data_ready = listen_sock.sk_data_ready;
sk->sk_state_change = listen_sock.sk_state_change;
sk->sk_write_space = listen_sock.sk_write_space;
sk->sk_error_report = listen_sock.sk_error_report;
- release_sock(sk);
}
/* Make a socket active */
@@ -640,10 +636,10 @@ static void add_sock(struct socket *sock, struct connection *con)
con->sock = sock;
sk->sk_user_data = con;
- /* Install a data_ready callback */
sk->sk_data_ready = lowcomms_data_ready;
sk->sk_write_space = lowcomms_write_space;
- sk->sk_state_change = lowcomms_state_change;
+ if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
+ sk->sk_state_change = lowcomms_state_change;
sk->sk_allocation = GFP_NOFS;
sk->sk_error_report = lowcomms_error_report;
release_sock(sk);
@@ -705,37 +701,62 @@ static void free_entry(struct writequeue_entry *e)
static void dlm_close_sock(struct socket **sock)
{
- if (*sock) {
- restore_callbacks(*sock);
- sock_release(*sock);
- *sock = NULL;
+ lock_sock((*sock)->sk);
+ restore_callbacks((*sock)->sk);
+ release_sock((*sock)->sk);
+
+ sock_release(*sock);
+ *sock = NULL;
+}
+
+static void allow_connection_io(struct connection *con)
+{
+ if (con->othercon)
+ clear_bit(CF_IO_STOP, &con->othercon->flags);
+ clear_bit(CF_IO_STOP, &con->flags);
+}
+
+static void stop_connection_io(struct connection *con)
+{
+ if (con->othercon)
+ stop_connection_io(con->othercon);
+
+ down_write(&con->sock_lock);
+ if (con->sock) {
+ lock_sock(con->sock->sk);
+ restore_callbacks(con->sock->sk);
+
+ spin_lock_bh(&con->writequeue_lock);
+ set_bit(CF_IO_STOP, &con->flags);
+ spin_unlock_bh(&con->writequeue_lock);
+ release_sock(con->sock->sk);
+ } else {
+ spin_lock_bh(&con->writequeue_lock);
+ set_bit(CF_IO_STOP, &con->flags);
+ spin_unlock_bh(&con->writequeue_lock);
}
+ up_write(&con->sock_lock);
+
+ cancel_work_sync(&con->swork);
+ cancel_work_sync(&con->rwork);
}
/* Close a remote connection and tidy up */
-static void close_connection(struct connection *con, bool and_other,
- bool tx, bool rx)
+static void close_connection(struct connection *con, bool and_other)
{
- bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
struct writequeue_entry *e;
- if (tx && !closing && cancel_work_sync(&con->swork)) {
- log_print("canceled swork for node %d", con->nodeid);
- clear_bit(CF_WRITE_PENDING, &con->flags);
- }
- if (rx && !closing && cancel_work_sync(&con->rwork)) {
- log_print("canceled rwork for node %d", con->nodeid);
- clear_bit(CF_READ_PENDING, &con->flags);
+ if (con->othercon && and_other)
+ close_connection(con->othercon, false);
+
+ down_write(&con->sock_lock);
+ if (!con->sock) {
+ up_write(&con->sock_lock);
+ return;
}
- mutex_lock(&con->sock_mutex);
dlm_close_sock(&con->sock);
- if (con->othercon && and_other) {
- /* Will only re-enter once. */
- close_connection(con->othercon, false, tx, rx);
- }
-
/* if we send a writequeue entry only a half way, we drop the
* whole entry because reconnection and that we not start of the
* middle of a msg which will confuse the other end.
@@ -747,143 +768,209 @@ static void close_connection(struct connection *con, bool and_other,
* our policy is to start on a clean state when disconnects, we don't
* know what's send/received on transport layer in this case.
*/
- spin_lock(&con->writequeue_lock);
+ spin_lock_bh(&con->writequeue_lock);
if (!list_empty(&con->writequeue)) {
e = list_first_entry(&con->writequeue, struct writequeue_entry,
list);
if (e->dirty)
free_entry(e);
}
- spin_unlock(&con->writequeue_lock);
+ spin_unlock_bh(&con->writequeue_lock);
con->rx_leftover = 0;
con->retries = 0;
clear_bit(CF_APP_LIMITED, &con->flags);
- clear_bit(CF_CONNECTED, &con->flags);
- clear_bit(CF_DELAY_CONNECT, &con->flags);
- clear_bit(CF_RECONNECT, &con->flags);
- mutex_unlock(&con->sock_mutex);
- clear_bit(CF_CLOSING, &con->flags);
+ clear_bit(CF_RECV_PENDING, &con->flags);
+ clear_bit(CF_SEND_PENDING, &con->flags);
+ up_write(&con->sock_lock);
}
-static int con_realloc_receive_buf(struct connection *con, int newlen)
+static struct processqueue_entry *new_processqueue_entry(int nodeid,
+ int buflen)
{
- unsigned char *newbuf;
-
- newbuf = kmalloc(newlen, GFP_NOFS);
- if (!newbuf)
- return -ENOMEM;
+ struct processqueue_entry *pentry;
- /* copy any leftover from last receive */
- if (con->rx_leftover)
- memmove(newbuf, con->rx_buf, con->rx_leftover);
+ pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
+ if (!pentry)
+ return NULL;
- /* swap to new buffer space */
- kfree(con->rx_buf);
- con->rx_buflen = newlen;
- con->rx_buf = newbuf;
+ pentry->buf = kmalloc(buflen, GFP_NOFS);
+ if (!pentry->buf) {
+ kfree(pentry);
+ return NULL;
+ }
- return 0;
+ pentry->nodeid = nodeid;
+ return pentry;
}
-/* Data received from remote end */
-static int receive_from_sock(struct connection *con)
+static void free_processqueue_entry(struct processqueue_entry *pentry)
{
- struct msghdr msg;
- struct kvec iov;
- int ret, buflen;
+ kfree(pentry->buf);
+ kfree(pentry);
+}
+
+struct dlm_processed_nodes {
+ int nodeid;
- mutex_lock(&con->sock_mutex);
+ struct list_head list;
+};
- if (con->sock == NULL) {
- ret = -EAGAIN;
- goto out_close;
+static void add_processed_node(int nodeid, struct list_head *processed_nodes)
+{
+ struct dlm_processed_nodes *n;
+
+ list_for_each_entry(n, processed_nodes, list) {
+ /* we already remembered this node */
+ if (n->nodeid == nodeid)
+ return;
}
- /* realloc if we get new buffer size to read out */
- buflen = dlm_config.ci_buffer_size;
- if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
- ret = con_realloc_receive_buf(con, buflen);
- if (ret < 0)
- goto out_resched;
+ /* if it's fails in worst case we simple don't send an ack back.
+ * We try it next time.
+ */
+ n = kmalloc(sizeof(*n), GFP_NOFS);
+ if (!n)
+ return;
+
+ n->nodeid = nodeid;
+ list_add(&n->list, processed_nodes);
+}
+
+static void process_dlm_messages(struct work_struct *work)
+{
+ struct dlm_processed_nodes *n, *n_tmp;
+ struct processqueue_entry *pentry;
+ LIST_HEAD(processed_nodes);
+
+ spin_lock(&processqueue_lock);
+ pentry = list_first_entry_or_null(&processqueue,
+ struct processqueue_entry, list);
+ if (WARN_ON_ONCE(!pentry)) {
+ spin_unlock(&processqueue_lock);
+ return;
}
+ list_del(&pentry->list);
+ spin_unlock(&processqueue_lock);
+
for (;;) {
- /* calculate new buffer parameter regarding last receive and
- * possible leftover bytes
- */
- iov.iov_base = con->rx_buf + con->rx_leftover;
- iov.iov_len = con->rx_buflen - con->rx_leftover;
-
- memset(&msg, 0, sizeof(msg));
- msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
- ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
- msg.msg_flags);
- trace_dlm_recv(con->nodeid, ret);
- if (ret == -EAGAIN)
+ dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
+ pentry->buflen);
+ add_processed_node(pentry->nodeid, &processed_nodes);
+ free_processqueue_entry(pentry);
+
+ spin_lock(&processqueue_lock);
+ pentry = list_first_entry_or_null(&processqueue,
+ struct processqueue_entry, list);
+ if (!pentry) {
+ process_dlm_messages_pending = false;
+ spin_unlock(&processqueue_lock);
break;
- else if (ret <= 0)
- goto out_close;
-
- /* new buflen according readed bytes and leftover from last receive */
- buflen = ret + con->rx_leftover;
- ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
- if (ret < 0)
- goto out_close;
-
- /* calculate leftover bytes from process and put it into begin of
- * the receive buffer, so next receive we have the full message
- *@the start address of the receive buffer.
- */
- con->rx_leftover = buflen - ret;
- if (con->rx_leftover) {
- memmove(con->rx_buf, con->rx_buf + ret,
- con->rx_leftover);
}
+
+ list_del(&pentry->list);
+ spin_unlock(&processqueue_lock);
+ }
+
+ /* send ack back after we processed couple of messages */
+ list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
+ list_del(&n->list);
+ dlm_midcomms_receive_done(n->nodeid);
+ kfree(n);
+ }
+}
+
+/* Data received from remote end */
+static int receive_from_sock(struct connection *con, int buflen)
+{
+ struct processqueue_entry *pentry;
+ int ret, buflen_real;
+ struct msghdr msg;
+ struct kvec iov;
+
+ pentry = new_processqueue_entry(con->nodeid, buflen);
+ if (!pentry)
+ return DLM_IO_RESCHED;
+
+ memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
+
+ /* calculate new buffer parameter regarding last receive and
+ * possible leftover bytes
+ */
+ iov.iov_base = pentry->buf + con->rx_leftover;
+ iov.iov_len = buflen - con->rx_leftover;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+ clear_bit(CF_RECV_INTR, &con->flags);
+again:
+ ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+ msg.msg_flags);
+ trace_dlm_recv(con->nodeid, ret);
+ if (ret == -EAGAIN) {
+ lock_sock(con->sock->sk);
+ if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
+ release_sock(con->sock->sk);
+ goto again;
+ }
+
+ clear_bit(CF_RECV_PENDING, &con->flags);
+ release_sock(con->sock->sk);
+ free_processqueue_entry(pentry);
+ return DLM_IO_END;
+ } else if (ret == 0) {
+ /* close will clear CF_RECV_PENDING */
+ free_processqueue_entry(pentry);
+ return DLM_IO_EOF;
+ } else if (ret < 0) {
+ free_processqueue_entry(pentry);
+ return ret;
}
- dlm_midcomms_receive_done(con->nodeid);
- mutex_unlock(&con->sock_mutex);
- return 0;
+ /* new buflen according readed bytes and leftover from last receive */
+ buflen_real = ret + con->rx_leftover;
+ ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
+ buflen_real);
+ if (ret < 0) {
+ free_processqueue_entry(pentry);
+ return ret;
+ }
-out_resched:
- if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
- queue_work(recv_workqueue, &con->rwork);
- mutex_unlock(&con->sock_mutex);
- return -EAGAIN;
-
-out_close:
- if (ret == 0) {
- log_print("connection %p got EOF from %d",
- con, con->nodeid);
-
- mutex_unlock(&con->sock_mutex);
- close_connection(con, false, true, false);
- /* signal to breaking receive worker */
- ret = -1;
- } else {
- mutex_unlock(&con->sock_mutex);
+ pentry->buflen = ret;
+
+ /* calculate leftover bytes from process and put it into begin of
+ * the receive buffer, so next receive we have the full message
+ * at the start address of the receive buffer.
+ */
+ con->rx_leftover = buflen_real - ret;
+ memmove(con->rx_leftover_buf, pentry->buf + ret,
+ con->rx_leftover);
+
+ spin_lock(&processqueue_lock);
+ list_add_tail(&pentry->list, &processqueue);
+ if (!process_dlm_messages_pending) {
+ process_dlm_messages_pending = true;
+ queue_work(process_workqueue, &process_work);
}
- return ret;
+ spin_unlock(&processqueue_lock);
+
+ return DLM_IO_SUCCESS;
}
/* Listening socket is busy, accept a connection */
-static int accept_from_sock(struct listen_connection *con)
+static int accept_from_sock(void)
{
- int result;
struct sockaddr_storage peeraddr;
- struct socket *newsock;
- int len, idx;
- int nodeid;
+ int len, idx, result, nodeid;
struct connection *newcon;
- struct connection *addcon;
+ struct socket *newsock;
unsigned int mark;
- if (!con->sock)
- return -ENOTCONN;
-
- result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
- if (result < 0)
+ result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
+ if (result == -EAGAIN)
+ return DLM_IO_END;
+ else if (result < 0)
goto accept_err;
/* Get the connected socket's peer */
@@ -940,7 +1027,7 @@ static int accept_from_sock(struct listen_connection *con)
sock_set_mark(newsock->sk, mark);
- mutex_lock(&newcon->sock_mutex);
+ down_write(&newcon->sock_lock);
if (newcon->sock) {
struct connection *othercon = newcon->othercon;
@@ -948,63 +1035,50 @@ static int accept_from_sock(struct listen_connection *con)
othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
if (!othercon) {
log_print("failed to allocate incoming socket");
- mutex_unlock(&newcon->sock_mutex);
+ up_write(&newcon->sock_lock);
srcu_read_unlock(&connections_srcu, idx);
result = -ENOMEM;
goto accept_err;
}
- result = dlm_con_init(othercon, nodeid);
- if (result < 0) {
- kfree(othercon);
- mutex_unlock(&newcon->sock_mutex);
- srcu_read_unlock(&connections_srcu, idx);
- goto accept_err;
- }
-
- lockdep_set_subclass(&othercon->sock_mutex, 1);
- set_bit(CF_IS_OTHERCON, &othercon->flags);
+ dlm_con_init(othercon, nodeid);
+ lockdep_set_subclass(&othercon->sock_lock, 1);
newcon->othercon = othercon;
- othercon->sendcon = newcon;
+ set_bit(CF_IS_OTHERCON, &othercon->flags);
} else {
/* close other sock con if we have something new */
- close_connection(othercon, false, true, false);
+ close_connection(othercon, false);
}
- mutex_lock(&othercon->sock_mutex);
+ down_write(&othercon->sock_lock);
add_sock(newsock, othercon);
- addcon = othercon;
- mutex_unlock(&othercon->sock_mutex);
+
+ /* check if we receved something while adding */
+ lock_sock(othercon->sock->sk);
+ lowcomms_queue_rwork(othercon);
+ release_sock(othercon->sock->sk);
+ up_write(&othercon->sock_lock);
}
else {
/* accept copies the sk after we've saved the callbacks, so we
don't want to save them a second time or comm errors will
result in calling sk_error_report recursively. */
add_sock(newsock, newcon);
- addcon = newcon;
- }
-
- set_bit(CF_CONNECTED, &addcon->flags);
- mutex_unlock(&newcon->sock_mutex);
-
- /*
- * Add it to the active queue in case we got data
- * between processing the accept adding the socket
- * to the read_sockets list
- */
- if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
- queue_work(recv_workqueue, &addcon->rwork);
+ /* check if we receved something while adding */
+ lock_sock(newcon->sock->sk);
+ lowcomms_queue_rwork(newcon);
+ release_sock(newcon->sock->sk);
+ }
+ up_write(&newcon->sock_lock);
srcu_read_unlock(&connections_srcu, idx);
- return 0;
+ return DLM_IO_SUCCESS;
accept_err:
if (newsock)
sock_release(newsock);
- if (result != -EAGAIN)
- log_print("error accepting connection from node: %d", result);
return result;
}
@@ -1098,7 +1172,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
{
struct writequeue_entry *e;
- spin_lock(&con->writequeue_lock);
+ spin_lock_bh(&con->writequeue_lock);
if (!list_empty(&con->writequeue)) {
e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
if (DLM_WQ_REMAIN_BYTES(e) >= len) {
@@ -1127,7 +1201,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
list_add_tail(&e->list, &con->writequeue);
out:
- spin_unlock(&con->writequeue_lock);
+ spin_unlock_bh(&con->writequeue_lock);
return e;
};
@@ -1176,7 +1250,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
len < sizeof(struct dlm_header)) {
BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
log_print("failed to allocate a buffer of size %d", len);
- WARN_ON(1);
+ WARN_ON_ONCE(1);
return NULL;
}
@@ -1207,7 +1281,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
struct connection *con = e->con;
int users;
- spin_lock(&con->writequeue_lock);
+ spin_lock_bh(&con->writequeue_lock);
kref_get(&msg->ref);
list_add(&msg->list, &e->msgs);
@@ -1216,13 +1290,11 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
goto out;
e->len = DLM_WQ_LENGTH_BYTES(e);
- spin_unlock(&con->writequeue_lock);
- queue_work(send_workqueue, &con->swork);
- return;
+ lowcomms_queue_swork(con);
out:
- spin_unlock(&con->writequeue_lock);
+ spin_unlock_bh(&con->writequeue_lock);
return;
}
@@ -1244,7 +1316,7 @@ void dlm_lowcomms_put_msg(struct dlm_msg *msg)
kref_put(&msg->ref, dlm_msg_release);
}
-/* does not held connections_srcu, usage workqueue only */
+/* does not held connections_srcu, usage lowcomms_error_report only */
int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
{
struct dlm_msg *msg_resend;
@@ -1270,88 +1342,78 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
}
/* Send a message */
-static void send_to_sock(struct connection *con)
+static int send_to_sock(struct connection *con)
{
const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
struct writequeue_entry *e;
int len, offset, ret;
- int count;
-
-again:
- count = 0;
- mutex_lock(&con->sock_mutex);
- if (con->sock == NULL)
- goto out_connect;
-
- spin_lock(&con->writequeue_lock);
- for (;;) {
- e = con_next_wq(con);
- if (!e)
- break;
-
- len = e->len;
- offset = e->offset;
- BUG_ON(len == 0 && e->users == 0);
- spin_unlock(&con->writequeue_lock);
-
- ret = kernel_sendpage(con->sock, e->page, offset, len,
- msg_flags);
- trace_dlm_send(con->nodeid, ret);
- if (ret == -EAGAIN || ret == 0) {
- if (ret == -EAGAIN &&
- test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
- !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
- /* Notify TCP that we're limited by the
- * application window size.
- */
- set_bit(SOCK_NOSPACE, &con->sock->flags);
- con->sock->sk->sk_write_pending++;
- }
- cond_resched();
- goto out;
- } else if (ret < 0)
- goto out;
+ spin_lock_bh(&con->writequeue_lock);
+ e = con_next_wq(con);
+ if (!e) {
+ clear_bit(CF_SEND_PENDING, &con->flags);
+ spin_unlock_bh(&con->writequeue_lock);
+ return DLM_IO_END;
+ }
- spin_lock(&con->writequeue_lock);
- writequeue_entry_complete(e, ret);
+ len = e->len;
+ offset = e->offset;
+ WARN_ON_ONCE(len == 0 && e->users == 0);
+ spin_unlock_bh(&con->writequeue_lock);
- /* Don't starve people filling buffers */
- if (++count >= MAX_SEND_MSG_COUNT) {
- spin_unlock(&con->writequeue_lock);
- mutex_unlock(&con->sock_mutex);
- cond_resched();
- goto again;
+ ret = kernel_sendpage(con->sock, e->page, offset, len,
+ msg_flags);
+ trace_dlm_send(con->nodeid, ret);
+ if (ret == -EAGAIN || ret == 0) {
+ lock_sock(con->sock->sk);
+ spin_lock_bh(&con->writequeue_lock);
+ if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
+ !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+ /* Notify TCP that we're limited by the
+ * application window size.
+ */
+ set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
+ con->sock->sk->sk_write_pending++;
+
+ clear_bit(CF_SEND_PENDING, &con->flags);
+ spin_unlock_bh(&con->writequeue_lock);
+ release_sock(con->sock->sk);
+
+ /* wait for write_space() event */
+ return DLM_IO_END;
}
+ spin_unlock_bh(&con->writequeue_lock);
+ release_sock(con->sock->sk);
+
+ return DLM_IO_RESCHED;
+ } else if (ret < 0) {
+ return ret;
}
- spin_unlock(&con->writequeue_lock);
-out:
- mutex_unlock(&con->sock_mutex);
- return;
+ spin_lock_bh(&con->writequeue_lock);
+ writequeue_entry_complete(e, ret);
+ spin_unlock_bh(&con->writequeue_lock);
-out_connect:
- mutex_unlock(&con->sock_mutex);
- queue_work(send_workqueue, &con->swork);
- cond_resched();
+ return DLM_IO_SUCCESS;
}
static void clean_one_writequeue(struct connection *con)
{
struct writequeue_entry *e, *safe;
- spin_lock(&con->writequeue_lock);
+ spin_lock_bh(&con->writequeue_lock);
list_for_each_entry_safe(e, safe, &con->writequeue, list) {
free_entry(e);
}
- spin_unlock(&con->writequeue_lock);
+ spin_unlock_bh(&con->writequeue_lock);
}
static void connection_release(struct rcu_head *rcu)
{
struct connection *con = container_of(rcu, struct connection, rcu);
- kfree(con->rx_buf);
+ WARN_ON_ONCE(!list_empty(&con->writequeue));
+ WARN_ON_ONCE(con->sock);
kfree(con);
}
@@ -1371,12 +1433,14 @@ int dlm_lowcomms_close(int nodeid)
return -ENOENT;
}
+ stop_connection_io(con);
+ log_print("io handling for node: %d stopped", nodeid);
+ close_connection(con, true);
+
spin_lock(&connections_lock);
hlist_del_rcu(&con->list);
spin_unlock(&connections_lock);
- close_connection(con, true, true, true);
-
clean_one_writequeue(con);
call_srcu(&connections_srcu, &con->rcu, connection_release);
if (con->othercon) {
@@ -1386,148 +1450,238 @@ int dlm_lowcomms_close(int nodeid)
}
srcu_read_unlock(&connections_srcu, idx);
+ /* for debugging we print when we are done to compare with other
+ * messages in between. This function need to be correctly synchronized
+ * with io handling
+ */
+ log_print("closing connection to node %d done", nodeid);
+
return 0;
}
-/* Receive workqueue function */
+/* Receive worker function */
static void process_recv_sockets(struct work_struct *work)
{
struct connection *con = container_of(work, struct connection, rwork);
+ int ret, buflen;
- clear_bit(CF_READ_PENDING, &con->flags);
- receive_from_sock(con);
+ down_read(&con->sock_lock);
+ if (!con->sock) {
+ up_read(&con->sock_lock);
+ return;
+ }
+
+ buflen = READ_ONCE(dlm_config.ci_buffer_size);
+ do {
+ ret = receive_from_sock(con, buflen);
+ } while (ret == DLM_IO_SUCCESS);
+ up_read(&con->sock_lock);
+
+ switch (ret) {
+ case DLM_IO_END:
+ /* CF_RECV_PENDING cleared */
+ break;
+ case DLM_IO_EOF:
+ close_connection(con, false);
+ /* CF_RECV_PENDING cleared */
+ break;
+ case DLM_IO_RESCHED:
+ cond_resched();
+ queue_work(io_workqueue, &con->rwork);
+ /* CF_RECV_PENDING not cleared */
+ break;
+ default:
+ if (ret < 0) {
+ if (test_bit(CF_IS_OTHERCON, &con->flags)) {
+ close_connection(con, false);
+ } else {
+ spin_lock_bh(&con->writequeue_lock);
+ lowcomms_queue_swork(con);
+ spin_unlock_bh(&con->writequeue_lock);
+ }
+
+ /* CF_RECV_PENDING cleared for othercon
+ * we trigger send queue if not already done
+ * and process_send_sockets will handle it
+ */
+ break;
+ }
+
+ WARN_ON_ONCE(1);
+ break;
+ }
}
static void process_listen_recv_socket(struct work_struct *work)
{
int ret;
+ if (WARN_ON_ONCE(!listen_con.sock))
+ return;
+
do {
- ret = accept_from_sock(&listen_con);
- } while (!ret);
+ ret = accept_from_sock();
+ } while (ret == DLM_IO_SUCCESS);
+
+ if (ret < 0)
+ log_print("critical error accepting connection: %d", ret);
}
-static void dlm_connect(struct connection *con)
+static int dlm_connect(struct connection *con)
{
struct sockaddr_storage addr;
int result, addr_len;
struct socket *sock;
unsigned int mark;
- /* Some odd races can cause double-connects, ignore them */
- if (con->retries++ > MAX_CONNECT_RETRIES)
- return;
-
- if (con->sock) {
- log_print("node %d already connected.", con->nodeid);
- return;
- }
-
memset(&addr, 0, sizeof(addr));
result = nodeid_to_addr(con->nodeid, &addr, NULL,
dlm_proto_ops->try_new_addr, &mark);
if (result < 0) {
log_print("no address for nodeid %d", con->nodeid);
- return;
+ return result;
}
/* Create a socket to communicate with */
result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
SOCK_STREAM, dlm_proto_ops->proto, &sock);
if (result < 0)
- goto socket_err;
+ return result;
sock_set_mark(sock->sk, mark);
dlm_proto_ops->sockopts(sock);
- add_sock(sock, con);
-
result = dlm_proto_ops->bind(sock);
- if (result < 0)
- goto add_sock_err;
+ if (result < 0) {
+ sock_release(sock);
+ return result;
+ }
+
+ add_sock(sock, con);
log_print_ratelimited("connecting to %d", con->nodeid);
make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
addr_len);
- if (result < 0)
- goto add_sock_err;
-
- return;
-
-add_sock_err:
- dlm_close_sock(&con->sock);
+ switch (result) {
+ case -EINPROGRESS:
+ /* not an error */
+ fallthrough;
+ case 0:
+ break;
+ default:
+ if (result < 0)
+ dlm_close_sock(&con->sock);
-socket_err:
- /*
- * Some errors are fatal and this list might need adjusting. For other
- * errors we try again until the max number of retries is reached.
- */
- if (result != -EHOSTUNREACH &&
- result != -ENETUNREACH &&
- result != -ENETDOWN &&
- result != -EINVAL &&
- result != -EPROTONOSUPPORT) {
- log_print("connect %d try %d error %d", con->nodeid,
- con->retries, result);
- msleep(1000);
- lowcomms_connect_sock(con);
+ break;
}
+
+ return result;
}
-/* Send workqueue function */
+/* Send worker function */
static void process_send_sockets(struct work_struct *work)
{
struct connection *con = container_of(work, struct connection, swork);
+ int ret;
- WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
+ WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
+
+ down_read(&con->sock_lock);
+ if (!con->sock) {
+ up_read(&con->sock_lock);
+ down_write(&con->sock_lock);
+ if (!con->sock) {
+ ret = dlm_connect(con);
+ switch (ret) {
+ case 0:
+ break;
+ case -EINPROGRESS:
+ /* avoid spamming resched on connection
+ * we might can switch to a state_change
+ * event based mechanism if established
+ */
+ msleep(100);
+ break;
+ default:
+ /* CF_SEND_PENDING not cleared */
+ up_write(&con->sock_lock);
+ log_print("connect to node %d try %d error %d",
+ con->nodeid, con->retries++, ret);
+ msleep(1000);
+ /* For now we try forever to reconnect. In
+ * future we should send a event to cluster
+ * manager to fence itself after certain amount
+ * of retries.
+ */
+ queue_work(io_workqueue, &con->swork);
+ return;
+ }
+ }
+ downgrade_write(&con->sock_lock);
+ }
- clear_bit(CF_WRITE_PENDING, &con->flags);
+ do {
+ ret = send_to_sock(con);
+ } while (ret == DLM_IO_SUCCESS);
+ up_read(&con->sock_lock);
- if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
- close_connection(con, false, false, true);
- dlm_midcomms_unack_msg_resend(con->nodeid);
- }
+ switch (ret) {
+ case DLM_IO_END:
+ /* CF_SEND_PENDING cleared */
+ break;
+ case DLM_IO_RESCHED:
+ /* CF_SEND_PENDING not cleared */
+ cond_resched();
+ queue_work(io_workqueue, &con->swork);
+ break;
+ default:
+ if (ret < 0) {
+ close_connection(con, false);
- if (con->sock == NULL) {
- if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
- msleep(1000);
+ /* CF_SEND_PENDING cleared */
+ spin_lock_bh(&con->writequeue_lock);
+ lowcomms_queue_swork(con);
+ spin_unlock_bh(&con->writequeue_lock);
+ break;
+ }
- mutex_lock(&con->sock_mutex);
- dlm_connect(con);
- mutex_unlock(&con->sock_mutex);
+ WARN_ON_ONCE(1);
+ break;
}
-
- if (!list_empty(&con->writequeue))
- send_to_sock(con);
}
static void work_stop(void)
{
- if (recv_workqueue) {
- destroy_workqueue(recv_workqueue);
- recv_workqueue = NULL;
+ if (io_workqueue) {
+ destroy_workqueue(io_workqueue);
+ io_workqueue = NULL;
}
- if (send_workqueue) {
- destroy_workqueue(send_workqueue);
- send_workqueue = NULL;
+ if (process_workqueue) {
+ destroy_workqueue(process_workqueue);
+ process_workqueue = NULL;
}
}
static int work_start(void)
{
- recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
- if (!recv_workqueue) {
- log_print("can't start dlm_recv");
+ io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM,
+ 0);
+ if (!io_workqueue) {
+ log_print("can't start dlm_io");
return -ENOMEM;
}
- send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
- if (!send_workqueue) {
- log_print("can't start dlm_send");
- destroy_workqueue(recv_workqueue);
- recv_workqueue = NULL;
+ /* ordered dlm message process queue,
+ * should be converted to a tasklet
+ */
+ process_workqueue = alloc_ordered_workqueue("dlm_process",
+ WQ_HIGHPRI | WQ_MEM_RECLAIM);
+ if (!process_workqueue) {
+ log_print("can't start dlm_process");
+ destroy_workqueue(io_workqueue);
+ io_workqueue = NULL;
return -ENOMEM;
}
@@ -1543,6 +1697,8 @@ void dlm_lowcomms_shutdown(void)
cancel_work_sync(&listen_con.rwork);
dlm_close_sock(&listen_con.sock);
+
+ flush_workqueue(process_workqueue);
}
void dlm_lowcomms_shutdown_node(int nodeid, bool force)
@@ -1558,79 +1714,19 @@ void dlm_lowcomms_shutdown_node(int nodeid, bool force)
}
flush_work(&con->swork);
+ stop_connection_io(con);
WARN_ON_ONCE(!force && !list_empty(&con->writequeue));
+ close_connection(con, true);
clean_one_writequeue(con);
if (con->othercon)
clean_one_writequeue(con->othercon);
- close_connection(con, true, true, true);
+ allow_connection_io(con);
srcu_read_unlock(&connections_srcu, idx);
}
-static void _stop_conn(struct connection *con, bool and_other)
-{
- mutex_lock(&con->sock_mutex);
- set_bit(CF_CLOSE, &con->flags);
- set_bit(CF_READ_PENDING, &con->flags);
- set_bit(CF_WRITE_PENDING, &con->flags);
- if (con->sock && con->sock->sk) {
- lock_sock(con->sock->sk);
- con->sock->sk->sk_user_data = NULL;
- release_sock(con->sock->sk);
- }
- if (con->othercon && and_other)
- _stop_conn(con->othercon, false);
- mutex_unlock(&con->sock_mutex);
-}
-
-static void stop_conn(struct connection *con)
-{
- _stop_conn(con, true);
-}
-
-static void free_conn(struct connection *con)
-{
- close_connection(con, true, true, true);
-}
-
-static void work_flush(void)
-{
- int ok;
- int i;
- struct connection *con;
-
- do {
- ok = 1;
- foreach_conn(stop_conn);
- if (recv_workqueue)
- flush_workqueue(recv_workqueue);
- if (send_workqueue)
- flush_workqueue(send_workqueue);
- for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
- hlist_for_each_entry_rcu(con, &connection_hash[i],
- list) {
- ok &= test_bit(CF_READ_PENDING, &con->flags);
- ok &= test_bit(CF_WRITE_PENDING, &con->flags);
- if (con->othercon) {
- ok &= test_bit(CF_READ_PENDING,
- &con->othercon->flags);
- ok &= test_bit(CF_WRITE_PENDING,
- &con->othercon->flags);
- }
- }
- }
- } while (!ok);
-}
-
void dlm_lowcomms_stop(void)
{
- int idx;
-
- idx = srcu_read_lock(&connections_srcu);
- work_flush();
- foreach_conn(free_conn);
- srcu_read_unlock(&connections_srcu, idx);
work_stop();
-
dlm_proto_ops = NULL;
}
@@ -1709,17 +1805,7 @@ static int dlm_tcp_bind(struct socket *sock)
static int dlm_tcp_connect(struct connection *con, struct socket *sock,
struct sockaddr *addr, int addr_len)
{
- int ret;
-
- ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
- switch (ret) {
- case -EINPROGRESS:
- fallthrough;
- case 0:
- return 0;
- }
-
- return ret;
+ return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
}
static int dlm_tcp_listen_validate(void)
@@ -1784,13 +1870,7 @@ static int dlm_sctp_connect(struct connection *con, struct socket *sock,
sock_set_sndtimeo(sock->sk, 5);
ret = sock->ops->connect(sock, addr, addr_len, 0);
sock_set_sndtimeo(sock->sk, 0);
- if (ret < 0)
- return ret;
-
- if (!test_and_set_bit(CF_CONNECTED, &con->flags))
- log_print("connected to node %d", con->nodeid);
-
- return 0;
+ return ret;
}
static int dlm_sctp_listen_validate(void)
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index b0e8bdcaab1b..fc015a6abe17 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -306,11 +306,11 @@ static void dlm_send_queue_flush(struct midcomms_node *node)
pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
rcu_read_lock();
- spin_lock(&node->send_queue_lock);
+ spin_lock_bh(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
dlm_mhandle_delete(node, mh);
}
- spin_unlock(&node->send_queue_lock);
+ spin_unlock_bh(&node->send_queue_lock);
rcu_read_unlock();
}
@@ -437,7 +437,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
}
}
- spin_lock(&node->send_queue_lock);
+ spin_lock_bh(&node->send_queue_lock);
list_for_each_entry_rcu(mh, &node->send_queue, list) {
if (before(mh->seq, seq)) {
dlm_mhandle_delete(node, mh);
@@ -446,7 +446,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
break;
}
}
- spin_unlock(&node->send_queue_lock);
+ spin_unlock_bh(&node->send_queue_lock);
rcu_read_unlock();
}
@@ -890,12 +890,7 @@ static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
dlm_receive_buffer(p, nodeid);
}
-/*
- * Called from the low-level comms layer to process a buffer of
- * commands.
- */
-
-int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
const unsigned char *ptr = buf;
const struct dlm_header *hd;
@@ -930,6 +925,32 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
if (msglen > len)
break;
+ ret += msglen;
+ len -= msglen;
+ ptr += msglen;
+ }
+
+ return ret;
+}
+
+/*
+ * Called from the low-level comms layer to process a buffer of
+ * commands.
+ */
+int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+{
+ const unsigned char *ptr = buf;
+ const struct dlm_header *hd;
+ uint16_t msglen;
+ int ret = 0;
+
+ while (len >= sizeof(struct dlm_header)) {
+ hd = (struct dlm_header *)ptr;
+
+ msglen = le16_to_cpu(hd->h_length);
+ if (msglen > len)
+ break;
+
switch (hd->h_version) {
case cpu_to_le32(DLM_VERSION_3_1):
dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
@@ -1046,9 +1067,9 @@ static void midcomms_new_msg_cb(void *data)
atomic_inc(&mh->node->send_queue_cnt);
- spin_lock(&mh->node->send_queue_lock);
+ spin_lock_bh(&mh->node->send_queue_lock);
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
- spin_unlock(&mh->node->send_queue_lock);
+ spin_unlock_bh(&mh->node->send_queue_lock);
mh->seq = mh->node->seq_send++;
}
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index 69296552d5ad..bea1cee4279c 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -14,6 +14,7 @@
struct midcomms_node;
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
--
2.31.1
^ permalink raw reply related [flat|nested] 18+ messages in thread
end of thread, other threads:[~2022-11-17 22:11 UTC | newest]
Thread overview: 18+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-11-17 22:11 [Cluster-devel] [PATCH dlm/next 01/18] fs: dlm: avoid false-positive checker warning Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 02/18] fs: dlm: drop lkb ref in bug case Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 03/18] fs: dlm: ast do WARN_ON_ONCE() on hotpath Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 04/18] fs: dlm: rename DLM_IFL_NEED_SCHED to DLM_IFL_CB_PENDING Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 05/18] fs: dlm: rename seq to h_seq for msg tracing Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 06/18] fs: dlm: add dst nodeid " Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 07/18] fs: dlm: add midcomms init/start functions Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 08/18] fs: dlm: remove twice INIT_WORK Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 09/18] fs: dlm: use list_first_entry_or_null Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 10/18] fs: dlm: use listen sock as dlm running indicator Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 11/18] fs: dlm: remove socket shutdown handling Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 12/18] fs: dlm: cleanup listen sock handling Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 13/18] fs: dlm: don't put dlm_local_addrs on heap Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 14/18] fs: dlm: remove dlm_node_addrs lookup list Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 15/18] fs: dlm: use sock2con without checking null Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 16/18] fs: dlm: use saved sk_error_report() Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 17/18] fs: dlm: don't init error value Alexander Aring
2022-11-17 22:11 ` [Cluster-devel] [PATCH dlm/next 18/18] fs: dlm: parallelize lowcomms socket handling Alexander Aring
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).