cluster-devel.redhat.com archive mirror
 help / color / mirror / Atom feed
* [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails
@ 2022-10-27 20:45 Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 02/18] fs: dlm: retry accept() until -EAGAIN or error returns Alexander Aring
                   ` (16 more replies)
  0 siblings, 17 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch fixes a double sock_release() call when the listen() is
called for the dlm lowcomms listen socket. The caller of
dlm_listen_for_all should never care about releasing the socket if
dlm_listen_for_all() fails, it's done now only once if listen() fails.

Cc: stable at vger.kernel.org
Fixes: 2dc6b1158c28 ("fs: dlm: introduce generic listen")
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 59f64c596233..2cb9f3b49e05 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1820,7 +1820,7 @@ static int dlm_listen_for_all(void)
 	result = sock->ops->listen(sock, 5);
 	if (result < 0) {
 		dlm_close_sock(&listen_con.sock);
-		goto out;
+		return result;
 	}
 
 	return 0;
@@ -2023,7 +2023,6 @@ int dlm_lowcomms_start(void)
 	dlm_proto_ops = NULL;
 fail_proto_ops:
 	dlm_allow_conn = 0;
-	dlm_close_sock(&listen_con.sock);
 	work_stop();
 fail_local:
 	deinit_local();
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 02/18] fs: dlm: retry accept() until -EAGAIN or error returns
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 03/18] fs: dlm: remove send repeat remove handling Alexander Aring
                   ` (15 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch fixes a race if we get two times an socket data ready event
while the listen connection worker is queued. Currently it will be
served only once but we need to do it (in this case twice) until we hit
-EAGAIN which tells us there is no pending accept going on.

This patch wraps an do while loop until we receive a return value which
is different than 0 as it was done before commit d11ccd451b65 ("fs: dlm:
listen socket out of connection hash").

Cc: stable at vger.kernel.org
Fixes: d11ccd451b65 ("fs: dlm: listen socket out of connection hash")
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 2cb9f3b49e05..871d4e9f49fb 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1543,7 +1543,11 @@ static void process_recv_sockets(struct work_struct *work)
 
 static void process_listen_recv_socket(struct work_struct *work)
 {
-	accept_from_sock(&listen_con);
+	int ret;
+
+	do {
+		ret = accept_from_sock(&listen_con);
+	} while (!ret);
 }
 
 static void dlm_connect(struct connection *con)
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 03/18] fs: dlm: remove send repeat remove handling
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 02/18] fs: dlm: retry accept() until -EAGAIN or error returns Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 04/18] fs: dlm: use packet in dlm_mhandle Alexander Aring
                   ` (14 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch removes the send repeat remove handling. This handling is
there to repeatingly DLM_MSG_REMOVE messages in cases the dlm stack
thinks it was not received at the first time. In cases of message drops
this functionality is necessary, but since the DLM midcomms layer
guarantees there are no messages drops between cluster nodes this
feature became not strict necessary anymore. Due message
delays/processing it could be that two send_repeat_remove() are sent out
while the other should be still on it's way. We remove the repeat remove
handling because we are sure that the message cannot be dropped due
communication errors.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c | 74 ---------------------------------------------------
 1 file changed, 74 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 94a72ede5764..b246d71b5e17 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4044,66 +4044,6 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
 	return error;
 }
 
-static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
-{
-	char name[DLM_RESNAME_MAXLEN + 1];
-	struct dlm_message *ms;
-	struct dlm_mhandle *mh;
-	struct dlm_rsb *r;
-	uint32_t hash, b;
-	int rv, dir_nodeid;
-
-	memset(name, 0, sizeof(name));
-	memcpy(name, ms_name, len);
-
-	hash = jhash(name, len, 0);
-	b = hash & (ls->ls_rsbtbl_size - 1);
-
-	dir_nodeid = dlm_hash2nodeid(ls, hash);
-
-	log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
-
-	spin_lock(&ls->ls_rsbtbl[b].lock);
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
-	if (!rv) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
-		log_error(ls, "repeat_remove on keep %s", name);
-		return;
-	}
-
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-	if (!rv) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
-		log_error(ls, "repeat_remove on toss %s", name);
-		return;
-	}
-
-	/* use ls->remove_name2 to avoid conflict with shrink? */
-
-	spin_lock(&ls->ls_remove_spin);
-	ls->ls_remove_len = len;
-	memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
-	spin_unlock(&ls->ls_remove_spin);
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
-
-	rv = _create_message(ls, sizeof(struct dlm_message) + len,
-			     dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
-	if (rv)
-		goto out;
-
-	memcpy(ms->m_extra, name, len);
-	ms->m_hash = cpu_to_le32(hash);
-
-	send_message(mh, ms);
-
-out:
-	spin_lock(&ls->ls_remove_spin);
-	ls->ls_remove_len = 0;
-	memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
-	spin_unlock(&ls->ls_remove_spin);
-	wake_up(&ls->ls_remove_wait);
-}
-
 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 {
 	struct dlm_lkb *lkb;
@@ -4173,25 +4113,11 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 	   ENOTBLK request failures when the lookup reply designating us
 	   as master is delayed. */
 
-	/* We could repeatedly return -EBADR here if our send_remove() is
-	   delayed in being sent/arriving/being processed on the dir node.
-	   Another node would repeatedly lookup up the master, and the dir
-	   node would continue returning our nodeid until our send_remove
-	   took effect.
-
-	   We send another remove message in case our previous send_remove
-	   was lost/ignored/missed somehow. */
-
 	if (error != -ENOTBLK) {
 		log_limit(ls, "receive_request %x from %d %d",
 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
 	}
 
-	if (namelen && error == -EBADR) {
-		send_repeat_remove(ls, ms->m_extra, namelen);
-		msleep(1000);
-	}
-
 	setup_stub_lkb(ls, ms);
 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
 	return error;
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 04/18] fs: dlm: use packet in dlm_mhandle
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 02/18] fs: dlm: retry accept() until -EAGAIN or error returns Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 03/18] fs: dlm: remove send repeat remove handling Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 05/18] fd: dlm: trace send/recv of dlm message and rcom Alexander Aring
                   ` (13 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

To allow more than just dereferencing the inner header we directly point
to the inner dlm packet which allows us to dereference the header, rcom
or message structure.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/midcomms.c | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 6489bc22ad61..4eed40d66da1 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -194,7 +194,7 @@ struct midcomms_node {
 };
 
 struct dlm_mhandle {
-	const struct dlm_header *inner_hd;
+	const union dlm_packet *inner_p;
 	struct midcomms_node *node;
 	struct dlm_opts *opts;
 	struct dlm_msg *msg;
@@ -1055,7 +1055,7 @@ static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int node
 	dlm_fill_opts_header(opts, len, mh->seq);
 
 	*ppc += sizeof(*opts);
-	mh->inner_hd = (const struct dlm_header *)*ppc;
+	mh->inner_p = (const union dlm_packet *)*ppc;
 	return msg;
 }
 
@@ -1133,7 +1133,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
 {
 	/* nexthdr chain for fast lookup */
-	mh->opts->o_nextcmd = mh->inner_hd->h_cmd;
+	mh->opts->o_nextcmd = mh->inner_p->header.h_cmd;
 	mh->committed = true;
 	dlm_lowcomms_commit_msg(mh->msg);
 }
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 05/18] fd: dlm: trace send/recv of dlm message and rcom
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (2 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 04/18] fs: dlm: use packet in dlm_mhandle Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 06/18] fs: dlm: let dlm_add_cb queue work after resume only Alexander Aring
                   ` (12 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch adds tracepoints for send and recv cases of dlm messages and
dlm rcom messages. In case of send and dlm message we add the dlm rsb
resource name this dlm messages belongs to. This has the advantage to
follow dlm messages on a per lock basis. In case of recv message the
resource name can be extracted by follow the send message sequence
number.

The dlm message DLM_MSG_PURGE doesn't belong to a lock request and will
not set the resource name in a dlm_message trace. The same for all rcom
messages.

There is additional handling required for this debugging functionality
which is tried to be small as possible. Also the midcomms layer gets
aware of lock resource names, for now this is required to make a
connection between sequence number and lock resource names. It is for
debugging purpose only.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c              |  21 +--
 fs/dlm/midcomms.c          |  45 +++++-
 fs/dlm/midcomms.h          |   3 +-
 fs/dlm/rcom.c              |   4 +-
 include/trace/events/dlm.h | 297 +++++++++++++++++++++++++++++++++++++
 5 files changed, 353 insertions(+), 17 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index b246d71b5e17..0b1bc24536ce 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3611,9 +3611,10 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
 /* further lowcomms enhancements or alternate implementations may make
    the return value from this function useful at some point */
 
-static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
+static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
+			const void *name, int namelen)
 {
-	dlm_midcomms_commit_mhandle(mh);
+	dlm_midcomms_commit_mhandle(mh, name, namelen);
 	return 0;
 }
 
@@ -3679,7 +3680,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
 
 	send_args(r, lkb, ms);
 
-	error = send_message(mh, ms);
+	error = send_message(mh, ms, r->res_name, r->res_length);
 	if (error)
 		goto fail;
 	return 0;
@@ -3742,7 +3743,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 	ms->m_result = 0;
 
-	error = send_message(mh, ms);
+	error = send_message(mh, ms, r->res_name, r->res_length);
  out:
 	return error;
 }
@@ -3763,7 +3764,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
 
 	ms->m_bastmode = cpu_to_le32(mode);
 
-	error = send_message(mh, ms);
+	error = send_message(mh, ms, r->res_name, r->res_length);
  out:
 	return error;
 }
@@ -3786,7 +3787,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 	send_args(r, lkb, ms);
 
-	error = send_message(mh, ms);
+	error = send_message(mh, ms, r->res_name, r->res_length);
 	if (error)
 		goto fail;
 	return 0;
@@ -3811,7 +3812,7 @@ static int send_remove(struct dlm_rsb *r)
 	memcpy(ms->m_extra, r->res_name, r->res_length);
 	ms->m_hash = cpu_to_le32(r->res_hash);
 
-	error = send_message(mh, ms);
+	error = send_message(mh, ms, r->res_name, r->res_length);
  out:
 	return error;
 }
@@ -3833,7 +3834,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 
 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
 
-	error = send_message(mh, ms);
+	error = send_message(mh, ms, r->res_name, r->res_length);
  out:
 	return error;
 }
@@ -3874,7 +3875,7 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
 
-	error = send_message(mh, ms);
+	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
  out:
 	return error;
 }
@@ -6300,7 +6301,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
 	ms->m_nodeid = cpu_to_le32(nodeid);
 	ms->m_pid = cpu_to_le32(pid);
 
-	return send_message(mh, ms);
+	return send_message(mh, ms, NULL, 0);
 }
 
 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 4eed40d66da1..e10a6e97df44 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -132,6 +132,7 @@
  */
 #define DLM_DEBUG_FENCE_TERMINATION	0
 
+#include <trace/events/dlm.h>
 #include <net/tcp.h>
 
 #include "dlm_internal.h"
@@ -415,7 +416,7 @@ static int dlm_send_fin(struct midcomms_node *node,
 	m_header->h_cmd = DLM_FIN;
 
 	pr_debug("sending fin msg to node %d\n", node->nodeid);
-	dlm_midcomms_commit_mhandle(mh);
+	dlm_midcomms_commit_mhandle(mh, NULL, 0);
 	set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
 
 	return 0;
@@ -474,6 +475,20 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 	spin_unlock(&node->state_lock);
 }
 
+static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p)
+{
+	switch (p->header.h_cmd) {
+	case DLM_MSG:
+		trace_dlm_recv_message(seq, &p->message);
+		break;
+	case DLM_RCOM:
+		trace_dlm_recv_rcom(seq, &p->rcom);
+		break;
+	default:
+		break;
+	}
+}
+
 static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 					struct midcomms_node *node,
 					uint32_t seq)
@@ -534,6 +549,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 			break;
 		default:
 			WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
+			dlm_receive_buffer_3_2_trace(seq, p);
 			dlm_receive_buffer(p, node->nodeid);
 			set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
 			break;
@@ -1130,11 +1146,30 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 }
 #endif
 
-static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
+static void dlm_midcomms_commit_msg_3_2_trace(const struct dlm_mhandle *mh,
+					      const void *name, int namelen)
+{
+	switch (mh->inner_p->header.h_cmd) {
+	case DLM_MSG:
+		trace_dlm_send_message(mh->seq, &mh->inner_p->message,
+				       name, namelen);
+		break;
+	case DLM_RCOM:
+		trace_dlm_send_rcom(mh->seq, &mh->inner_p->rcom);
+		break;
+	default:
+		/* nothing to trace */
+		break;
+	}
+}
+
+static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh,
+					const void *name, int namelen)
 {
 	/* nexthdr chain for fast lookup */
 	mh->opts->o_nextcmd = mh->inner_p->header.h_cmd;
 	mh->committed = true;
+	dlm_midcomms_commit_msg_3_2_trace(mh, name, namelen);
 	dlm_lowcomms_commit_msg(mh->msg);
 }
 
@@ -1142,8 +1177,10 @@ static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
  * dlm_midcomms_get_mhandle
  */
 #ifndef __CHECKER__
-void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
+void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh,
+				 const void *name, int namelen)
 {
+
 	switch (mh->node->version) {
 	case DLM_VERSION_3_1:
 		srcu_read_unlock(&nodes_srcu, mh->idx);
@@ -1154,7 +1191,7 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
 		dlm_free_mhandle(mh);
 		break;
 	case DLM_VERSION_3_2:
-		dlm_midcomms_commit_msg_3_2(mh);
+		dlm_midcomms_commit_msg_3_2(mh, name, namelen);
 		srcu_read_unlock(&nodes_srcu, mh->idx);
 		break;
 	default:
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index 82bcd9661922..d6286e80b077 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -17,7 +17,8 @@ struct midcomms_node;
 int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 					     gfp_t allocation, char **ppc);
-void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh);
+void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
+				 int namelen);
 int dlm_midcomms_close(int nodeid);
 int dlm_midcomms_start(void);
 void dlm_midcomms_shutdown(void);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f19860315043..b76d52e2f6bd 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -91,7 +91,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
 
 static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
 {
-	dlm_midcomms_commit_mhandle(mh);
+	dlm_midcomms_commit_mhandle(mh, NULL, 0);
 }
 
 static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
@@ -516,7 +516,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 	rf = (struct rcom_config *) rc->rc_buf;
 	rf->rf_lvblen = cpu_to_le32(~0U);
 
-	dlm_midcomms_commit_mhandle(mh);
+	dlm_midcomms_commit_mhandle(mh, NULL, 0);
 
 	return 0;
 }
diff --git a/include/trace/events/dlm.h b/include/trace/events/dlm.h
index da0eaae98fa3..4ec47828d55e 100644
--- a/include/trace/events/dlm.h
+++ b/include/trace/events/dlm.h
@@ -46,6 +46,56 @@
 	{ DLM_SBF_VALNOTVALID,	"VALNOTVALID" },		\
 	{ DLM_SBF_ALTMODE,	"ALTMODE" })
 
+#define show_lkb_flags(flags) __print_flags(flags, "|",		\
+	{ DLM_IFL_MSTCPY,	"MSTCPY" },			\
+	{ DLM_IFL_RESEND,	"RESEND" },			\
+	{ DLM_IFL_DEAD,		"DEAD" },			\
+	{ DLM_IFL_OVERLAP_UNLOCK, "OVERLAP_UNLOCK" },		\
+	{ DLM_IFL_OVERLAP_CANCEL, "OVERLAP_CANCEL" },		\
+	{ DLM_IFL_ENDOFLIFE,	"ENDOFLIFE" },			\
+	{ DLM_IFL_DEADLOCK_CANCEL, "DEADLOCK_CANCEL" },		\
+	{ DLM_IFL_STUB_MS,	"STUB_MS" },			\
+	{ DLM_IFL_USER,		"USER" },			\
+	{ DLM_IFL_ORPHAN,	"ORPHAN" })
+
+#define show_header_cmd(cmd) __print_symbolic(cmd,		\
+	{ DLM_MSG,		"MSG"},				\
+	{ DLM_RCOM,		"RCOM"},			\
+	{ DLM_OPTS,		"OPTS"},			\
+	{ DLM_ACK,		"ACK"},				\
+	{ DLM_FIN,		"FIN"})
+
+#define show_message_version(version) __print_symbolic(version,	\
+	{ DLM_VERSION_3_1,	"3.1"},				\
+	{ DLM_VERSION_3_2,	"3.2"})
+
+#define show_message_type(type) __print_symbolic(type,		\
+	{ DLM_MSG_REQUEST,	"REQUEST"},			\
+	{ DLM_MSG_CONVERT,	"CONVERT"},			\
+	{ DLM_MSG_UNLOCK,	"UNLOCK"},			\
+	{ DLM_MSG_CANCEL,	"CANCEL"},			\
+	{ DLM_MSG_REQUEST_REPLY, "REQUEST_REPLY"},		\
+	{ DLM_MSG_CONVERT_REPLY, "CONVERT_REPLY"},		\
+	{ DLM_MSG_UNLOCK_REPLY,	"UNLOCK_REPLY"},		\
+	{ DLM_MSG_CANCEL_REPLY,	"CANCEL_REPLY"},		\
+	{ DLM_MSG_GRANT,	"GRANT"},			\
+	{ DLM_MSG_BAST,		"BAST"},			\
+	{ DLM_MSG_LOOKUP,	"LOOKUP"},			\
+	{ DLM_MSG_REMOVE,	"REMOVE"},			\
+	{ DLM_MSG_LOOKUP_REPLY,	"LOOKUP_REPLY"},		\
+	{ DLM_MSG_PURGE,	"PURGE"})
+
+#define show_rcom_type(type) __print_symbolic(type,            \
+	{ DLM_RCOM_STATUS,              "STATUS"},              \
+	{ DLM_RCOM_NAMES,               "NAMES"},               \
+	{ DLM_RCOM_LOOKUP,              "LOOKUP"},              \
+	{ DLM_RCOM_LOCK,                "LOCK"},                \
+	{ DLM_RCOM_STATUS_REPLY,        "STATUS_REPLY"},        \
+	{ DLM_RCOM_NAMES_REPLY,         "NAMES_REPLY"},         \
+	{ DLM_RCOM_LOOKUP_REPLY,        "LOOKUP_REPLY"},        \
+	{ DLM_RCOM_LOCK_REPLY,          "LOCK_REPLY"})
+
+
 /* note: we begin tracing dlm_lock_start() only if ls and lkb are found */
 TRACE_EVENT(dlm_lock_start,
 
@@ -290,6 +340,253 @@ TRACE_EVENT(dlm_unlock_end,
 
 );
 
+DECLARE_EVENT_CLASS(dlm_rcom_template,
+
+	TP_PROTO(uint32_t seq, const struct dlm_rcom *rc),
+
+	TP_ARGS(seq, rc),
+
+	TP_STRUCT__entry(
+		__field(uint32_t, seq)
+		__field(uint32_t, h_version)
+		__field(uint32_t, h_lockspace)
+		__field(uint32_t, h_nodeid)
+		__field(uint16_t, h_length)
+		__field(uint8_t, h_cmd)
+		__field(uint32_t, rc_type)
+		__field(int32_t, rc_result)
+		__field(uint64_t, rc_id)
+		__field(uint64_t, rc_seq)
+		__field(uint64_t, rc_seq_reply)
+		__dynamic_array(unsigned char, rc_buf,
+				le16_to_cpu(rc->rc_header.h_length) - sizeof(*rc))
+	),
+
+	TP_fast_assign(
+		__entry->seq = seq;
+		__entry->h_version = le32_to_cpu(rc->rc_header.h_version);
+		__entry->h_lockspace = le32_to_cpu(rc->rc_header.u.h_lockspace);
+		__entry->h_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
+		__entry->h_length = le16_to_cpu(rc->rc_header.h_length);
+		__entry->h_cmd = rc->rc_header.h_cmd;
+		__entry->rc_type = le32_to_cpu(rc->rc_type);
+		__entry->rc_result = le32_to_cpu(rc->rc_result);
+		__entry->rc_id = le64_to_cpu(rc->rc_id);
+		__entry->rc_seq = le64_to_cpu(rc->rc_seq);
+		__entry->rc_seq_reply = le64_to_cpu(rc->rc_seq_reply);
+		memcpy(__get_dynamic_array(rc_buf), rc->rc_buf,
+		       __get_dynamic_array_len(rc_buf));
+	),
+
+	TP_printk("seq=%u, h_version=%s h_lockspace=%u h_nodeid=%u "
+		  "h_length=%u h_cmd=%s rc_type=%s rc_result=%d "
+		  "rc_id=%llu rc_seq=%llu rc_seq_reply=%llu "
+		  "rc_buf=0x%s", __entry->seq,
+		  show_message_version(__entry->h_version),
+		  __entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
+		  show_header_cmd(__entry->h_cmd),
+		  show_rcom_type(__entry->rc_type),
+		  __entry->rc_result, __entry->rc_id, __entry->rc_seq,
+		  __entry->rc_seq_reply,
+		  __print_hex_str(__get_dynamic_array(rc_buf),
+				  __get_dynamic_array_len(rc_buf)))
+
+);
+
+DEFINE_EVENT(dlm_rcom_template, dlm_send_rcom,
+	     TP_PROTO(uint32_t seq, const struct dlm_rcom *rc),
+	     TP_ARGS(seq, rc));
+
+DEFINE_EVENT(dlm_rcom_template, dlm_recv_rcom,
+	     TP_PROTO(uint32_t seq, const struct dlm_rcom *rc),
+	     TP_ARGS(seq, rc));
+
+TRACE_EVENT(dlm_send_message,
+
+	TP_PROTO(uint32_t seq, const struct dlm_message *ms,
+		 const void *name, int namelen),
+
+	TP_ARGS(seq, ms, name, namelen),
+
+	TP_STRUCT__entry(
+		__field(uint32_t, seq)
+		__field(uint32_t, h_version)
+		__field(uint32_t, h_lockspace)
+		__field(uint32_t, h_nodeid)
+		__field(uint16_t, h_length)
+		__field(uint8_t, h_cmd)
+		__field(uint32_t, m_type)
+		__field(uint32_t, m_nodeid)
+		__field(uint32_t, m_pid)
+		__field(uint32_t, m_lkid)
+		__field(uint32_t, m_remid)
+		__field(uint32_t, m_parent_lkid)
+		__field(uint32_t, m_parent_remid)
+		__field(uint32_t, m_exflags)
+		__field(uint32_t, m_sbflags)
+		__field(uint32_t, m_flags)
+		__field(uint32_t, m_lvbseq)
+		__field(uint32_t, m_hash)
+		__field(int32_t, m_status)
+		__field(int32_t, m_grmode)
+		__field(int32_t, m_rqmode)
+		__field(int32_t, m_bastmode)
+		__field(int32_t, m_asts)
+		__field(int32_t, m_result)
+		__dynamic_array(unsigned char, m_extra,
+				le16_to_cpu(ms->m_header.h_length) - sizeof(*ms))
+		__dynamic_array(unsigned char, res_name, namelen)
+	),
+
+	TP_fast_assign(
+		__entry->seq = seq;
+		__entry->h_version = le32_to_cpu(ms->m_header.h_version);
+		__entry->h_lockspace = le32_to_cpu(ms->m_header.u.h_lockspace);
+		__entry->h_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+		__entry->h_length = le16_to_cpu(ms->m_header.h_length);
+		__entry->h_cmd = ms->m_header.h_cmd;
+		__entry->m_type = le32_to_cpu(ms->m_type);
+		__entry->m_nodeid = le32_to_cpu(ms->m_nodeid);
+		__entry->m_pid = le32_to_cpu(ms->m_pid);
+		__entry->m_lkid = le32_to_cpu(ms->m_lkid);
+		__entry->m_remid = le32_to_cpu(ms->m_remid);
+		__entry->m_parent_lkid = le32_to_cpu(ms->m_parent_lkid);
+		__entry->m_parent_remid = le32_to_cpu(ms->m_parent_remid);
+		__entry->m_exflags = le32_to_cpu(ms->m_exflags);
+		__entry->m_sbflags = le32_to_cpu(ms->m_sbflags);
+		__entry->m_flags = le32_to_cpu(ms->m_flags);
+		__entry->m_lvbseq = le32_to_cpu(ms->m_lvbseq);
+		__entry->m_hash = le32_to_cpu(ms->m_hash);
+		__entry->m_status = le32_to_cpu(ms->m_status);
+		__entry->m_grmode = le32_to_cpu(ms->m_grmode);
+		__entry->m_rqmode = le32_to_cpu(ms->m_rqmode);
+		__entry->m_bastmode = le32_to_cpu(ms->m_bastmode);
+		__entry->m_asts = le32_to_cpu(ms->m_asts);
+		__entry->m_result = le32_to_cpu(ms->m_result);
+		memcpy(__get_dynamic_array(m_extra), ms->m_extra,
+		       __get_dynamic_array_len(m_extra));
+		memcpy(__get_dynamic_array(res_name), name,
+		       __get_dynamic_array_len(res_name));
+	),
+
+	TP_printk("seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
+		  "h_length=%u h_cmd=%s m_type=%s m_nodeid=%u "
+		  "m_pid=%u m_lkid=%u m_remid=%u m_parent_lkid=%u "
+		  "m_parent_remid=%u m_exflags=%s m_sbflags=%s m_flags=%s "
+		  "m_lvbseq=%u m_hash=%u m_status=%d m_grmode=%s "
+		  "m_rqmode=%s m_bastmode=%s m_asts=%d m_result=%d "
+		  "m_extra=0x%s res_name=0x%s",
+		  __entry->seq, show_message_version(__entry->h_version),
+		  __entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
+		  show_header_cmd(__entry->h_cmd),
+		  show_message_type(__entry->m_type),
+		  __entry->m_nodeid, __entry->m_pid, __entry->m_lkid,
+		  __entry->m_remid, __entry->m_parent_lkid,
+		  __entry->m_parent_remid, show_lock_flags(__entry->m_exflags),
+		  show_dlm_sb_flags(__entry->m_sbflags),
+		  show_lkb_flags(__entry->m_flags), __entry->m_lvbseq,
+		  __entry->m_hash, __entry->m_status,
+		  show_lock_mode(__entry->m_grmode),
+		  show_lock_mode(__entry->m_rqmode),
+		  show_lock_mode(__entry->m_bastmode),
+		  __entry->m_asts, __entry->m_result,
+		  __print_hex_str(__get_dynamic_array(m_extra),
+				  __get_dynamic_array_len(m_extra)),
+		  __print_hex_str(__get_dynamic_array(res_name),
+				  __get_dynamic_array_len(res_name)))
+
+);
+
+TRACE_EVENT(dlm_recv_message,
+
+	TP_PROTO(uint32_t seq, const struct dlm_message *ms),
+
+	TP_ARGS(seq, ms),
+
+	TP_STRUCT__entry(
+		__field(uint32_t, seq)
+		__field(uint32_t, h_version)
+		__field(uint32_t, h_lockspace)
+		__field(uint32_t, h_nodeid)
+		__field(uint16_t, h_length)
+		__field(uint8_t, h_cmd)
+		__field(uint32_t, m_type)
+		__field(uint32_t, m_nodeid)
+		__field(uint32_t, m_pid)
+		__field(uint32_t, m_lkid)
+		__field(uint32_t, m_remid)
+		__field(uint32_t, m_parent_lkid)
+		__field(uint32_t, m_parent_remid)
+		__field(uint32_t, m_exflags)
+		__field(uint32_t, m_sbflags)
+		__field(uint32_t, m_flags)
+		__field(uint32_t, m_lvbseq)
+		__field(uint32_t, m_hash)
+		__field(int32_t, m_status)
+		__field(int32_t, m_grmode)
+		__field(int32_t, m_rqmode)
+		__field(int32_t, m_bastmode)
+		__field(int32_t, m_asts)
+		__field(int32_t, m_result)
+		__dynamic_array(unsigned char, m_extra,
+				le16_to_cpu(ms->m_header.h_length) - sizeof(*ms))
+	),
+
+	TP_fast_assign(
+		__entry->seq = seq;
+		__entry->h_version = le32_to_cpu(ms->m_header.h_version);
+		__entry->h_lockspace = le32_to_cpu(ms->m_header.u.h_lockspace);
+		__entry->h_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+		__entry->h_length = le16_to_cpu(ms->m_header.h_length);
+		__entry->h_cmd = ms->m_header.h_cmd;
+		__entry->m_type = le32_to_cpu(ms->m_type);
+		__entry->m_nodeid = le32_to_cpu(ms->m_nodeid);
+		__entry->m_pid = le32_to_cpu(ms->m_pid);
+		__entry->m_lkid = le32_to_cpu(ms->m_lkid);
+		__entry->m_remid = le32_to_cpu(ms->m_remid);
+		__entry->m_parent_lkid = le32_to_cpu(ms->m_parent_lkid);
+		__entry->m_parent_remid = le32_to_cpu(ms->m_parent_remid);
+		__entry->m_exflags = le32_to_cpu(ms->m_exflags);
+		__entry->m_sbflags = le32_to_cpu(ms->m_sbflags);
+		__entry->m_flags = le32_to_cpu(ms->m_flags);
+		__entry->m_lvbseq = le32_to_cpu(ms->m_lvbseq);
+		__entry->m_hash = le32_to_cpu(ms->m_hash);
+		__entry->m_status = le32_to_cpu(ms->m_status);
+		__entry->m_grmode = le32_to_cpu(ms->m_grmode);
+		__entry->m_rqmode = le32_to_cpu(ms->m_rqmode);
+		__entry->m_bastmode = le32_to_cpu(ms->m_bastmode);
+		__entry->m_asts = le32_to_cpu(ms->m_asts);
+		__entry->m_result = le32_to_cpu(ms->m_result);
+		memcpy(__get_dynamic_array(m_extra), ms->m_extra,
+		       __get_dynamic_array_len(m_extra));
+	),
+
+	TP_printk("seq=%u h_version=%s h_lockspace=%u h_nodeid=%u "
+		  "h_length=%u h_cmd=%s m_type=%s m_nodeid=%u "
+		  "m_pid=%u m_lkid=%u m_remid=%u m_parent_lkid=%u "
+		  "m_parent_remid=%u m_exflags=%s m_sbflags=%s m_flags=%s "
+		  "m_lvbseq=%u m_hash=%u m_status=%d m_grmode=%s "
+		  "m_rqmode=%s m_bastmode=%s m_asts=%d m_result=%d "
+		  "m_extra=0x%s",
+		  __entry->seq, show_message_version(__entry->h_version),
+		  __entry->h_lockspace, __entry->h_nodeid, __entry->h_length,
+		  show_header_cmd(__entry->h_cmd),
+		  show_message_type(__entry->m_type),
+		  __entry->m_nodeid, __entry->m_pid, __entry->m_lkid,
+		  __entry->m_remid, __entry->m_parent_lkid,
+		  __entry->m_parent_remid, show_lock_flags(__entry->m_exflags),
+		  show_dlm_sb_flags(__entry->m_sbflags),
+		  show_lkb_flags(__entry->m_flags), __entry->m_lvbseq,
+		  __entry->m_hash, __entry->m_status,
+		  show_lock_mode(__entry->m_grmode),
+		  show_lock_mode(__entry->m_rqmode),
+		  show_lock_mode(__entry->m_bastmode),
+		  __entry->m_asts, __entry->m_result,
+		  __print_hex_str(__get_dynamic_array(m_extra),
+				  __get_dynamic_array_len(m_extra)))
+
+);
+
 TRACE_EVENT(dlm_send,
 
 	TP_PROTO(int nodeid, int ret),
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 06/18] fs: dlm: let dlm_add_cb queue work after resume only
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (3 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 05/18] fd: dlm: trace send/recv of dlm message and rcom Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 07/18] fs: dlm: use list_first_entry marco Alexander Aring
                   ` (11 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

We should allow dlm_add_cb() to call queue_work() only after the
recovery queued pending for delayed lkbs. This patch will move the
switch LSFL_CB_DELAY after the delayed lkb work was processed.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index d60a8d8f109d..6e07c151ad28 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -308,8 +308,6 @@ void dlm_callback_resume(struct dlm_ls *ls)
 	if (!ls->ls_callback_wq)
 		return;
 
-	clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
-
 more:
 	mutex_lock(&ls->ls_cb_mutex);
 	list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
@@ -320,6 +318,8 @@ void dlm_callback_resume(struct dlm_ls *ls)
 			break;
 	}
 	empty = list_empty(&ls->ls_cb_delay);
+	if (empty)
+		clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
 	mutex_unlock(&ls->ls_cb_mutex);
 
 	sum += count;
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 07/18] fs: dlm: use list_first_entry marco
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (4 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 06/18] fs: dlm: let dlm_add_cb queue work after resume only Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 08/18] fs: dlm: convert ls_cb_mutex mutex to spinlock Alexander Aring
                   ` (10 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Instead of using list_entry() this patch moves to using the
list_first_entry() macro.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/user.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index c5d27bccc3dc..6a5de0918a96 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -857,7 +857,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 	   without removing lkb_cb_list; so empty lkb_cb_list is always
 	   consistent with empty lkb_callbacks */
 
-	lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list);
+	lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list);
 
 	/* rem_lkb_callback sets a new lkb_last_cast */
 	old_mode = lkb->lkb_last_cast.mode;
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 08/18] fs: dlm: convert ls_cb_mutex mutex to spinlock
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (5 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 07/18] fs: dlm: use list_first_entry marco Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 09/18] fs: dlm: use spin lock instead of mutex Alexander Aring
                   ` (9 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch converts the ls_cb_mutex mutex to a spinlock, there is no
sleepable context when this lock is held.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          | 12 ++++++------
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lockspace.c    |  2 +-
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 6e07c151ad28..daaa0dff6ef4 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -200,13 +200,13 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 	if (!prev_seq) {
 		kref_get(&lkb->lkb_ref);
 
-		mutex_lock(&ls->ls_cb_mutex);
+		spin_lock(&ls->ls_cb_lock);
 		if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
 			list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
 		} else {
 			queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
 		}
-		mutex_unlock(&ls->ls_cb_mutex);
+		spin_unlock(&ls->ls_cb_lock);
 	}
  out:
 	mutex_unlock(&lkb->lkb_cb_mutex);
@@ -289,9 +289,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
 void dlm_callback_suspend(struct dlm_ls *ls)
 {
 	if (ls->ls_callback_wq) {
-		mutex_lock(&ls->ls_cb_mutex);
+		spin_lock(&ls->ls_cb_lock);
 		set_bit(LSFL_CB_DELAY, &ls->ls_flags);
-		mutex_unlock(&ls->ls_cb_mutex);
+		spin_unlock(&ls->ls_cb_lock);
 
 		flush_workqueue(ls->ls_callback_wq);
 	}
@@ -309,7 +309,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 		return;
 
 more:
-	mutex_lock(&ls->ls_cb_mutex);
+	spin_lock(&ls->ls_cb_lock);
 	list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
 		list_del_init(&lkb->lkb_cb_list);
 		queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
@@ -320,7 +320,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 	empty = list_empty(&ls->ls_cb_delay);
 	if (empty)
 		clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
-	mutex_unlock(&ls->ls_cb_mutex);
+	spin_unlock(&ls->ls_cb_lock);
 
 	sum += count;
 	if (!empty) {
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index e34c3d2639a5..647a6a61531c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -631,7 +631,7 @@ struct dlm_ls {
 
 	/* recovery related */
 
-	struct mutex		ls_cb_mutex;
+	spinlock_t		ls_cb_lock;
 	struct list_head	ls_cb_delay; /* save for queue_work later */
 	struct timer_list	ls_timer;
 	struct task_struct	*ls_recoverd_task;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index bae050df7abf..72fa8a9d7a40 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -567,7 +567,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_completion(&ls->ls_recovery_done);
 	ls->ls_recovery_result = -1;
 
-	mutex_init(&ls->ls_cb_mutex);
+	spin_lock_init(&ls->ls_cb_lock);
 	INIT_LIST_HEAD(&ls->ls_cb_delay);
 
 	ls->ls_recoverd_task = NULL;
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 09/18] fs: dlm: use spin lock instead of mutex
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (6 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 08/18] fs: dlm: convert ls_cb_mutex mutex to spinlock Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 10/18] fs: dlm: move last cast bast time to function call Alexander Aring
                   ` (8 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

There is no need to use a mutex in those hot path sections. We change it
to spin lock to serve callbacks more faster by not allowing schedule.
The locked sections will not be locked for a long time.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          | 8 ++++----
 fs/dlm/dlm_internal.h | 2 +-
 fs/dlm/lock.c         | 2 +-
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index daaa0dff6ef4..3e76ec75bc55 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -190,7 +190,7 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 		return;
 	}
 
-	mutex_lock(&lkb->lkb_cb_mutex);
+	spin_lock(&lkb->lkb_cb_lock);
 	prev_seq = lkb->lkb_callbacks[0].seq;
 
 	rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
@@ -209,7 +209,7 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 		spin_unlock(&ls->ls_cb_lock);
 	}
  out:
-	mutex_unlock(&lkb->lkb_cb_mutex);
+	spin_unlock(&lkb->lkb_cb_lock);
 }
 
 void dlm_callback_work(struct work_struct *work)
@@ -223,7 +223,7 @@ void dlm_callback_work(struct work_struct *work)
 
 	memset(&callbacks, 0, sizeof(callbacks));
 
-	mutex_lock(&lkb->lkb_cb_mutex);
+	spin_lock(&lkb->lkb_cb_lock);
 	if (!lkb->lkb_callbacks[0].seq) {
 		/* no callback work exists, shouldn't happen */
 		log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
@@ -244,7 +244,7 @@ void dlm_callback_work(struct work_struct *work)
 		dlm_print_lkb(lkb);
 		dlm_dump_lkb_callbacks(lkb);
 	}
-	mutex_unlock(&lkb->lkb_cb_mutex);
+	spin_unlock(&lkb->lkb_cb_lock);
 
 	castfn = lkb->lkb_astfn;
 	bastfn = lkb->lkb_bastfn;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 647a6a61531c..11f3a5c67bdd 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -268,7 +268,7 @@ struct dlm_lkb {
 	unsigned long		lkb_timeout_cs;
 #endif
 
-	struct mutex		lkb_cb_mutex;
+	spinlock_t		lkb_cb_lock;
 	struct work_struct	lkb_cb_work;
 	struct list_head	lkb_cb_list; /* for ls_cb_delay or proc->asts */
 	struct dlm_callback	lkb_callbacks[DLM_CALLBACKS_SIZE];
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 0b1bc24536ce..40e4e4a1c582 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1218,7 +1218,7 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_time_list);
 #endif
 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
-	mutex_init(&lkb->lkb_cb_mutex);
+	spin_lock_init(&lkb->lkb_cb_lock);
 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
 
 	idr_preload(GFP_NOFS);
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 10/18] fs: dlm: move last cast bast time to function call
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (7 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 09/18] fs: dlm: use spin lock instead of mutex Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 11/18] fs: dlm: use a non-static queue for callbacks Alexander Aring
                   ` (7 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch moves the debugging information of the last cast and bast
time when calling the last and bast function call.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 3e76ec75bc55..8393d2090c1c 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -158,15 +158,11 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
 		}
 	}
 
-	if (cb->flags & DLM_CB_CAST) {
+	if (cb->flags & DLM_CB_CAST)
 		memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
-		lkb->lkb_last_cast_time = ktime_get();
-	}
 
-	if (cb->flags & DLM_CB_BAST) {
+	if (cb->flags & DLM_CB_BAST)
 		memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
-		lkb->lkb_last_bast_time = ktime_get();
-	}
 	rv = 0;
  out:
 	return rv;
@@ -256,11 +252,13 @@ void dlm_callback_work(struct work_struct *work)
 			continue;
 		} else if (callbacks[i].flags & DLM_CB_BAST) {
 			trace_dlm_bast(ls, lkb, callbacks[i].mode);
+			lkb->lkb_last_bast_time = ktime_get();
 			bastfn(lkb->lkb_astparam, callbacks[i].mode);
 		} else if (callbacks[i].flags & DLM_CB_CAST) {
 			lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
 			lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
 			trace_dlm_ast(ls, lkb);
+			lkb->lkb_last_cast_time = ktime_get();
 			castfn(lkb->lkb_astparam);
 		}
 	}
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 11/18] fs: dlm: use a non-static queue for callbacks
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (8 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 10/18] fs: dlm: move last cast bast time to function call Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-28 21:17   ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 12/18] fs: dlm: allow different allocation context per _create_message Alexander Aring
                   ` (6 subsequent siblings)
  16 siblings, 1 reply; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will introducde a queue implementation for callbacks by using
the Linux lists. The current callback queue handling is implemented by a
static limit of 6 entries, see DLM_CALLBACKS_SIZE. The sequence number
inside the callback structure was used to see if the entries inside the
static entry is valid or not. We don't need any sequence numbers anymore
with a dynamic datastructure with grows and shrinks during runtime to
offer such functionality.

We assume that every callback will be delivered to the DLM user if once
queued. Therefore the callback flag DLM_CB_SKIP was dropped and the
check for skipping bast was moved before worker handling and not skip
while the callback worker executes. This will reduce unnecessary queues
of the callback worker.

All last callback saves are pointers now and don't need to copied over.
There is a reference counter for callback structures which will care
about to free the callback structures at the right time if they are not
referenced anymore.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          | 294 ++++++++++++++++++------------------------
 fs/dlm/ast.h          |  17 ++-
 fs/dlm/debug_fs.c     |   2 +-
 fs/dlm/dlm_internal.h |  15 ++-
 fs/dlm/lock.c         |   8 +-
 fs/dlm/memory.c       |  26 ++++
 fs/dlm/memory.h       |   2 +
 fs/dlm/user.c         |  73 ++++++-----
 fs/dlm/user.h         |   2 +-
 9 files changed, 222 insertions(+), 217 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 8393d2090c1c..078bbbd43a53 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -12,55 +12,68 @@
 #include <trace/events/dlm.h>
 
 #include "dlm_internal.h"
+#include "memory.h"
 #include "lock.h"
 #include "user.h"
 #include "ast.h"
 
-static uint64_t dlm_cb_seq;
-static DEFINE_SPINLOCK(dlm_cb_seq_spin);
+void dlm_release_callback(struct kref *ref)
+{
+	struct dlm_callback *cb = container_of(ref, struct dlm_callback, ref);
+
+	dlm_free_cb(cb);
+}
+
+void dlm_callback_set_last_ptr(struct dlm_callback **from,
+			       struct dlm_callback *to)
+{
+	if (*from)
+		kref_put(&(*from)->ref, dlm_release_callback);
+
+	if (to)
+		kref_get(&to->ref);
+
+	*from = to;
+}
 
-static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
+void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
 {
-	int i;
-
-	log_print("last_bast %x %llu flags %x mode %d sb %d %x",
-		  lkb->lkb_id,
-		  (unsigned long long)lkb->lkb_last_bast.seq,
-		  lkb->lkb_last_bast.flags,
-		  lkb->lkb_last_bast.mode,
-		  lkb->lkb_last_bast.sb_status,
-		  lkb->lkb_last_bast.sb_flags);
-
-	log_print("last_cast %x %llu flags %x mode %d sb %d %x",
-		  lkb->lkb_id,
-		  (unsigned long long)lkb->lkb_last_cast.seq,
-		  lkb->lkb_last_cast.flags,
-		  lkb->lkb_last_cast.mode,
-		  lkb->lkb_last_cast.sb_status,
-		  lkb->lkb_last_cast.sb_flags);
-
-	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
-		log_print("cb %x %llu flags %x mode %d sb %d %x",
-			  lkb->lkb_id,
-			  (unsigned long long)lkb->lkb_callbacks[i].seq,
-			  lkb->lkb_callbacks[i].flags,
-			  lkb->lkb_callbacks[i].mode,
-			  lkb->lkb_callbacks[i].sb_status,
-			  lkb->lkb_callbacks[i].sb_flags);
+	struct dlm_callback *cb, *safe;
+
+	list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
+		list_del(&cb->list);
+		kref_put(&cb->ref, dlm_release_callback);
 	}
+
+	/* TODO */
+	lkb->lkb_flags &= ~DLM_IFL_NEED_SCHED;
+
+	/* invalidate */
+	dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
+	dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
+	lkb->lkb_last_bast_mode = -1;
 }
 
-int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
-			 int status, uint32_t sbflags, uint64_t seq)
+int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+			     int status, uint32_t sbflags)
 {
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-	uint64_t prev_seq;
+	int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
+	struct dlm_callback *cb;
 	int prev_mode;
-	int i, rv;
 
-	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
-		if (lkb->lkb_callbacks[i].seq)
-			continue;
+	if (flags & DLM_CB_BAST) {
+		/* if cb is a bast, it should be skipped if the blocking mode is
+		 * compatible with the last granted mode
+		 */
+		if (lkb->lkb_last_cast) {
+			if (dlm_modes_compat(mode, lkb->lkb_last_cast->mode)) {
+				log_debug(ls, "skip %x bast mode %d for cast mode %d",
+					  lkb->lkb_id, mode,
+					  lkb->lkb_last_cast->mode);
+				goto out;
+			}
+		}
 
 		/*
 		 * Suppress some redundant basts here, do more on removal.
@@ -68,132 +81,75 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
 		 * is a bast for the same mode or a more restrictive mode.
 		 * (the addional > PR check is needed for PR/CW inversion)
 		 */
-
-		if ((i > 0) && (flags & DLM_CB_BAST) &&
-		    (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {
-
-			prev_seq = lkb->lkb_callbacks[i-1].seq;
-			prev_mode = lkb->lkb_callbacks[i-1].mode;
+		if (lkb->lkb_last_cb && lkb->lkb_last_cb->flags & DLM_CB_BAST) {
+			prev_mode = lkb->lkb_last_cb->mode;
 
 			if ((prev_mode == mode) ||
 			    (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
-
-				log_debug(ls, "skip %x add bast %llu mode %d "
-					  "for bast %llu mode %d",
-					  lkb->lkb_id,
-					  (unsigned long long)seq,
-					  mode,
-					  (unsigned long long)prev_seq,
-					  prev_mode);
-				rv = 0;
+				log_debug(ls, "skip %x add bast mode %d for bast mode %d",
+					  lkb->lkb_id, mode, prev_mode);
 				goto out;
 			}
 		}
-
-		lkb->lkb_callbacks[i].seq = seq;
-		lkb->lkb_callbacks[i].flags = flags;
-		lkb->lkb_callbacks[i].mode = mode;
-		lkb->lkb_callbacks[i].sb_status = status;
-		lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
-		rv = 0;
-		break;
 	}
 
-	if (i == DLM_CALLBACKS_SIZE) {
-		log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
-			  lkb->lkb_id, (unsigned long long)seq,
-			  flags, mode, status, sbflags);
-		dlm_dump_lkb_callbacks(lkb);
-		rv = -1;
+	cb = dlm_allocate_cb();
+	if (!cb) {
+		rv = DLM_ENQUEUE_CALLBACK_FAILURE;
 		goto out;
 	}
- out:
-	return rv;
-}
-
-int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
-			 struct dlm_callback *cb, int *resid)
-{
-	int i, rv;
-
-	*resid = 0;
-
-	if (!lkb->lkb_callbacks[0].seq) {
-		rv = -ENOENT;
-		goto out;
-	}
-
-	/* oldest undelivered cb is callbacks[0] */
-
-	memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
-	memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));
 
-	/* shift others down */
-
-	for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
-		if (!lkb->lkb_callbacks[i].seq)
-			break;
-		memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
-		       sizeof(struct dlm_callback));
-		memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
-		(*resid)++;
+	cb->flags = flags;
+	cb->mode = mode;
+	cb->sb_status = status;
+	cb->sb_flags = (sbflags & 0x000000FF);
+	kref_init(&cb->ref);
+	if (!(lkb->lkb_flags & DLM_IFL_NEED_SCHED)) {
+		lkb->lkb_flags |= DLM_IFL_NEED_SCHED;
+		rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
 	}
+	list_add_tail(&cb->list, &lkb->lkb_callbacks);
 
-	/* if cb is a bast, it should be skipped if the blocking mode is
-	   compatible with the last granted mode */
-
-	if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
-		if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
-			cb->flags |= DLM_CB_SKIP;
-
-			log_debug(ls, "skip %x bast %llu mode %d "
-				  "for cast %llu mode %d",
-				  lkb->lkb_id,
-				  (unsigned long long)cb->seq,
-				  cb->mode,
-				  (unsigned long long)lkb->lkb_last_cast.seq,
-				  lkb->lkb_last_cast.mode);
-			rv = 0;
-			goto out;
-		}
-	}
+	if (flags & DLM_CB_CAST)
+		dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb);
 
-	if (cb->flags & DLM_CB_CAST)
-		memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
+	dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
 
-	if (cb->flags & DLM_CB_BAST)
-		memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
-	rv = 0;
  out:
 	return rv;
 }
 
+int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
+{
+	/* oldest undelivered cb is callbacks first entry */
+	*cb = list_first_entry_or_null(&lkb->lkb_callbacks,
+				       struct dlm_callback, list);
+	if (!*cb)
+		return DLM_DEQUEUE_CALLBACK_EMPTY;
+
+	/* remove it from callbacks so shift others down */
+	list_del(&(*cb)->list);
+	if (list_empty(&lkb->lkb_callbacks))
+		return DLM_DEQUEUE_CALLBACK_LAST;
+
+	return DLM_DEQUEUE_CALLBACK_SUCCESS;
+}
+
 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 		uint32_t sbflags)
 {
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-	uint64_t new_seq, prev_seq;
 	int rv;
 
-	spin_lock(&dlm_cb_seq_spin);
-	new_seq = ++dlm_cb_seq;
-	if (!dlm_cb_seq)
-		new_seq = ++dlm_cb_seq;
-	spin_unlock(&dlm_cb_seq_spin);
-
 	if (lkb->lkb_flags & DLM_IFL_USER) {
-		dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
+		dlm_user_add_ast(lkb, flags, mode, status, sbflags);
 		return;
 	}
 
 	spin_lock(&lkb->lkb_cb_lock);
-	prev_seq = lkb->lkb_callbacks[0].seq;
-
-	rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
-	if (rv < 0)
-		goto out;
-
-	if (!prev_seq) {
+	rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
+	switch (rv) {
+	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
 		kref_get(&lkb->lkb_ref);
 
 		spin_lock(&ls->ls_cb_lock);
@@ -203,8 +159,16 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 			queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
 		}
 		spin_unlock(&ls->ls_cb_lock);
+		break;
+	case DLM_ENQUEUE_CALLBACK_FAILURE:
+		WARN_ON(1);
+		break;
+	case DLM_ENQUEUE_CALLBACK_SUCCESS:
+		break;
+	default:
+		WARN_ON(1);
+		break;
 	}
- out:
 	spin_unlock(&lkb->lkb_cb_lock);
 }
 
@@ -214,53 +178,43 @@ void dlm_callback_work(struct work_struct *work)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	void (*castfn) (void *astparam);
 	void (*bastfn) (void *astparam, int mode);
-	struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
-	int i, rv, resid;
-
-	memset(&callbacks, 0, sizeof(callbacks));
+	struct dlm_callback *cb;
+	int rv;
 
 	spin_lock(&lkb->lkb_cb_lock);
-	if (!lkb->lkb_callbacks[0].seq) {
-		/* no callback work exists, shouldn't happen */
-		log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
-		dlm_print_lkb(lkb);
-		dlm_dump_lkb_callbacks(lkb);
-	}
-
-	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
-		rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
-		if (rv < 0)
-			break;
-	}
-
-	if (resid) {
-		/* cbs remain, loop should have removed all, shouldn't happen */
-		log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
-			  resid);
-		dlm_print_lkb(lkb);
-		dlm_dump_lkb_callbacks(lkb);
-	}
+	rv = dlm_dequeue_lkb_callback(lkb, &cb);
 	spin_unlock(&lkb->lkb_cb_lock);
 
-	castfn = lkb->lkb_astfn;
-	bastfn = lkb->lkb_bastfn;
+	if (WARN_ON(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
+		return;
 
-	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
-		if (!callbacks[i].seq)
-			break;
-		if (callbacks[i].flags & DLM_CB_SKIP) {
-			continue;
-		} else if (callbacks[i].flags & DLM_CB_BAST) {
-			trace_dlm_bast(ls, lkb, callbacks[i].mode);
+	for (;;) {
+		castfn = lkb->lkb_astfn;
+		bastfn = lkb->lkb_bastfn;
+
+		if (cb->flags & DLM_CB_BAST) {
+			trace_dlm_bast(ls, lkb, cb->mode);
 			lkb->lkb_last_bast_time = ktime_get();
-			bastfn(lkb->lkb_astparam, callbacks[i].mode);
-		} else if (callbacks[i].flags & DLM_CB_CAST) {
-			lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
-			lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
+			lkb->lkb_last_bast_mode = cb->mode;
+			bastfn(lkb->lkb_astparam, cb->mode);
+		} else if (cb->flags & DLM_CB_CAST) {
+			lkb->lkb_lksb->sb_status = cb->sb_status;
+			lkb->lkb_lksb->sb_flags = cb->sb_flags;
 			trace_dlm_ast(ls, lkb);
 			lkb->lkb_last_cast_time = ktime_get();
 			castfn(lkb->lkb_astparam);
 		}
+
+		kref_put(&cb->ref, dlm_release_callback);
+
+		spin_lock(&lkb->lkb_cb_lock);
+		rv = dlm_dequeue_lkb_callback(lkb, &cb);
+		if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
+			lkb->lkb_flags &= ~DLM_IFL_NEED_SCHED;
+			spin_unlock(&lkb->lkb_cb_lock);
+			break;
+		}
+		spin_unlock(&lkb->lkb_cb_lock);
 	}
 
 	/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h
index e5e05fcc5813..880b11882495 100644
--- a/fs/dlm/ast.h
+++ b/fs/dlm/ast.h
@@ -11,13 +11,22 @@
 #ifndef __ASTD_DOT_H__
 #define __ASTD_DOT_H__
 
-int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
-                         int status, uint32_t sbflags, uint64_t seq);
-int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
-                         struct dlm_callback *cb, int *resid);
+#define DLM_ENQUEUE_CALLBACK_NEED_SCHED	1
+#define DLM_ENQUEUE_CALLBACK_SUCCESS	0
+#define DLM_ENQUEUE_CALLBACK_FAILURE	-1
+int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+			     int status, uint32_t sbflags);
+#define DLM_DEQUEUE_CALLBACK_EMPTY	2
+#define DLM_DEQUEUE_CALLBACK_LAST	1
+#define DLM_DEQUEUE_CALLBACK_SUCCESS	0
+int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb);
 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
                 uint32_t sbflags);
+void dlm_callback_set_last_ptr(struct dlm_callback **from,
+			       struct dlm_callback *to);
 
+void dlm_release_callback(struct kref *ref);
+void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb);
 void dlm_callback_work(struct work_struct *work);
 int dlm_callback_start(struct dlm_ls *ls);
 void dlm_callback_stop(struct dlm_ls *ls);
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 8fb04ebbafb5..8a0e1b1f74ad 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -246,7 +246,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
 		   lkb->lkb_status,
 		   lkb->lkb_grmode,
 		   lkb->lkb_rqmode,
-		   lkb->lkb_last_bast.mode,
+		   lkb->lkb_last_bast_mode,
 		   rsb_lookup,
 		   lkb->lkb_wait_type,
 		   lkb->lkb_lvbseq,
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 11f3a5c67bdd..c03f34fed5e9 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -211,6 +211,7 @@ struct dlm_args {
 #endif
 #define DLM_IFL_DEADLOCK_CANCEL	0x01000000
 #define DLM_IFL_STUB_MS		0x02000000 /* magic number for m_flags */
+#define DLM_IFL_NEED_SCHED	0x04000000
 /* least significant 2 bytes are message changed, they are full transmitted
  * but at receive side only the 2 bytes LSB will be set.
  *
@@ -222,18 +223,17 @@ struct dlm_args {
 #define DLM_IFL_USER		0x00000001
 #define DLM_IFL_ORPHAN		0x00000002
 
-#define DLM_CALLBACKS_SIZE	6
-
 #define DLM_CB_CAST		0x00000001
 #define DLM_CB_BAST		0x00000002
-#define DLM_CB_SKIP		0x00000004
 
 struct dlm_callback {
-	uint64_t		seq;
 	uint32_t		flags;		/* DLM_CBF_ */
 	int			sb_status;	/* copy to lksb status */
 	uint8_t			sb_flags;	/* copy to lksb flags */
 	int8_t			mode; /* rq mode of bast, gr mode of cast */
+
+	struct list_head	list;
+	struct kref		ref;
 };
 
 struct dlm_lkb {
@@ -271,9 +271,10 @@ struct dlm_lkb {
 	spinlock_t		lkb_cb_lock;
 	struct work_struct	lkb_cb_work;
 	struct list_head	lkb_cb_list; /* for ls_cb_delay or proc->asts */
-	struct dlm_callback	lkb_callbacks[DLM_CALLBACKS_SIZE];
-	struct dlm_callback	lkb_last_cast;
-	struct dlm_callback	lkb_last_bast;
+	struct list_head	lkb_callbacks;
+	struct dlm_callback	*lkb_last_cast;
+	struct dlm_callback	*lkb_last_cb;
+	int			lkb_last_bast_mode;
 	ktime_t			lkb_last_cast_time;	/* for debugging */
 	ktime_t			lkb_last_bast_time;	/* for debugging */
 
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 40e4e4a1c582..f4b2fb17bbb1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1209,6 +1209,7 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	if (!lkb)
 		return -ENOMEM;
 
+	lkb->lkb_last_bast_mode = -1;
 	lkb->lkb_nodeid = -1;
 	lkb->lkb_grmode = DLM_LOCK_IV;
 	kref_init(&lkb->lkb_ref);
@@ -1218,6 +1219,7 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_time_list);
 #endif
 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
+	INIT_LIST_HEAD(&lkb->lkb_callbacks);
 	spin_lock_init(&lkb->lkb_cb_lock);
 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
 
@@ -6221,8 +6223,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 	}
 
 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
-		memset(&lkb->lkb_callbacks, 0,
-		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
+		dlm_purge_lkb_callbacks(lkb);
 		list_del_init(&lkb->lkb_cb_list);
 		dlm_put_lkb(lkb);
 	}
@@ -6263,8 +6264,7 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 
 	spin_lock(&proc->asts_spin);
 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
-		memset(&lkb->lkb_callbacks, 0,
-		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
+		dlm_purge_lkb_callbacks(lkb);
 		list_del_init(&lkb->lkb_cb_list);
 		dlm_put_lkb(lkb);
 	}
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index ce35c3c19aeb..61fe0d1f5646 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -14,12 +14,14 @@
 #include "lowcomms.h"
 #include "config.h"
 #include "memory.h"
+#include "ast.h"
 
 static struct kmem_cache *writequeue_cache;
 static struct kmem_cache *mhandle_cache;
 static struct kmem_cache *msg_cache;
 static struct kmem_cache *lkb_cache;
 static struct kmem_cache *rsb_cache;
+static struct kmem_cache *cb_cache;
 
 
 int __init dlm_memory_init(void)
@@ -46,8 +48,16 @@ int __init dlm_memory_init(void)
 	if (!rsb_cache)
 		goto rsb;
 
+	cb_cache = kmem_cache_create("dlm_cb", sizeof(struct dlm_callback),
+				     __alignof__(struct dlm_callback), 0,
+				     NULL);
+	if (!rsb_cache)
+		goto cb;
+
 	return 0;
 
+cb:
+	kmem_cache_destroy(rsb_cache);
 rsb:
 	kmem_cache_destroy(msg_cache);
 msg:
@@ -67,6 +77,7 @@ void dlm_memory_exit(void)
 	kmem_cache_destroy(msg_cache);
 	kmem_cache_destroy(lkb_cache);
 	kmem_cache_destroy(rsb_cache);
+	kmem_cache_destroy(cb_cache);
 }
 
 char *dlm_allocate_lvb(struct dlm_ls *ls)
@@ -115,6 +126,11 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
 			kfree(ua);
 		}
 	}
+
+	/* drop references if they are set */
+	dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
+	dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
+
 	kmem_cache_free(lkb_cache, lkb);
 }
 
@@ -147,3 +163,13 @@ void dlm_free_msg(struct dlm_msg *msg)
 {
 	kmem_cache_free(msg_cache, msg);
 }
+
+struct dlm_callback *dlm_allocate_cb(void)
+{
+	return kmem_cache_alloc(cb_cache, GFP_ATOMIC);
+}
+
+void dlm_free_cb(struct dlm_callback *cb)
+{
+	kmem_cache_free(cb_cache, cb);
+}
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index 7bd3f1a391ca..c1583ec8b2cf 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -26,6 +26,8 @@ struct writequeue_entry *dlm_allocate_writequeue(void);
 void dlm_free_writequeue(struct writequeue_entry *writequeue);
 struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
 void dlm_free_msg(struct dlm_msg *msg);
+struct dlm_callback *dlm_allocate_cb(void);
+void dlm_free_cb(struct dlm_callback *cb);
 
 #endif		/* __MEMORY_DOT_H__ */
 
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 6a5de0918a96..6b530db4bc0b 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -25,6 +25,7 @@
 #include "user.h"
 #include "ast.h"
 #include "config.h"
+#include "memory.h"
 
 static const char name_prefix[] = "dlm";
 static const struct file_operations device_fops;
@@ -175,7 +176,7 @@ static int lkb_is_endoflife(int mode, int status)
    being removed and then remove that lkb from the orphans list and free it */
 
 void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
-		      int status, uint32_t sbflags, uint64_t seq)
+		      int status, uint32_t sbflags)
 {
 	struct dlm_ls *ls;
 	struct dlm_user_args *ua;
@@ -209,16 +210,22 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 
 	spin_lock(&proc->asts_spin);
 
-	rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
-	if (rv < 0) {
+	rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
+	switch (rv) {
+	case DLM_ENQUEUE_CALLBACK_FAILURE:
 		spin_unlock(&proc->asts_spin);
+		WARN_ON(1);
 		goto out;
-	}
-
-	if (list_empty(&lkb->lkb_cb_list)) {
+	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
 		kref_get(&lkb->lkb_ref);
 		list_add_tail(&lkb->lkb_cb_list, &proc->asts);
 		wake_up_interruptible(&proc->wait);
+		break;
+	case DLM_ENQUEUE_CALLBACK_SUCCESS:
+		break;
+	default:
+		WARN_ON(1);
+		break;
 	}
 	spin_unlock(&proc->asts_spin);
 
@@ -800,8 +807,8 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 	struct dlm_user_proc *proc = file->private_data;
 	struct dlm_lkb *lkb;
 	DECLARE_WAITQUEUE(wait, current);
-	struct dlm_callback cb;
-	int rv, resid, copy_lvb = 0;
+	struct dlm_callback *cb;
+	int rv, copy_lvb = 0;
 	int old_mode, new_mode;
 
 	if (count == sizeof(struct dlm_device_version)) {
@@ -860,50 +867,56 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 	lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list);
 
 	/* rem_lkb_callback sets a new lkb_last_cast */
-	old_mode = lkb->lkb_last_cast.mode;
+	old_mode = lkb->lkb_last_cast->mode;
 
-	rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
-	if (rv < 0) {
+	rv = dlm_dequeue_lkb_callback(lkb, &cb);
+	switch (rv) {
+	case DLM_DEQUEUE_CALLBACK_EMPTY:
 		/* this shouldn't happen; lkb should have been removed from
-		   list when resid was zero */
+		 * list when last item was dequeued
+		 */
 		log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
 		list_del_init(&lkb->lkb_cb_list);
 		spin_unlock(&proc->asts_spin);
 		/* removes ref for proc->asts, may cause lkb to be freed */
 		dlm_put_lkb(lkb);
+		WARN_ON(1);
 		goto try_another;
-	}
-	if (!resid)
+	case DLM_DEQUEUE_CALLBACK_LAST:
 		list_del_init(&lkb->lkb_cb_list);
-	spin_unlock(&proc->asts_spin);
-
-	if (cb.flags & DLM_CB_SKIP) {
-		/* removes ref for proc->asts, may cause lkb to be freed */
-		if (!resid)
-			dlm_put_lkb(lkb);
-		goto try_another;
+		/* TODO */
+		lkb->lkb_flags &= ~DLM_IFL_NEED_SCHED;
+		break;
+	case DLM_DEQUEUE_CALLBACK_SUCCESS:
+		break;
+	default:
+		WARN_ON(1);
+		break;
 	}
+	spin_unlock(&proc->asts_spin);
 
-	if (cb.flags & DLM_CB_BAST) {
-		trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb.mode);
-	} else if (cb.flags & DLM_CB_CAST) {
-		new_mode = cb.mode;
+	if (cb->flags & DLM_CB_BAST) {
+		trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode);
+	} else if (cb->flags & DLM_CB_CAST) {
+		new_mode = cb->mode;
 
-		if (!cb.sb_status && lkb->lkb_lksb->sb_lvbptr &&
+		if (!cb->sb_status && lkb->lkb_lksb->sb_lvbptr &&
 		    dlm_lvb_operations[old_mode + 1][new_mode + 1])
 			copy_lvb = 1;
 
-		lkb->lkb_lksb->sb_status = cb.sb_status;
-		lkb->lkb_lksb->sb_flags = cb.sb_flags;
+		lkb->lkb_lksb->sb_status = cb->sb_status;
+		lkb->lkb_lksb->sb_flags = cb->sb_flags;
 		trace_dlm_ast(lkb->lkb_resource->res_ls, lkb);
 	}
 
 	rv = copy_result_to_user(lkb->lkb_ua,
 				 test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
-				 cb.flags, cb.mode, copy_lvb, buf, count);
+				 cb->flags, cb->mode, copy_lvb, buf, count);
+
+	kref_put(&cb->ref, dlm_release_callback);
 
 	/* removes ref for proc->asts, may cause lkb to be freed */
-	if (!resid)
+	if (rv == DLM_DEQUEUE_CALLBACK_LAST)
 		dlm_put_lkb(lkb);
 
 	return rv;
diff --git a/fs/dlm/user.h b/fs/dlm/user.h
index 6b9bce6b96e0..33059452d79e 100644
--- a/fs/dlm/user.h
+++ b/fs/dlm/user.h
@@ -7,7 +7,7 @@
 #define __USER_DOT_H__
 
 void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
-                      int status, uint32_t sbflags, uint64_t seq);
+		      int status, uint32_t sbflags);
 int dlm_user_init(void);
 void dlm_user_exit(void);
 int dlm_device_deregister(struct dlm_ls *ls);
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 12/18] fs: dlm: allow different allocation context per _create_message
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (9 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 11/18] fs: dlm: use a non-static queue for callbacks Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 13/18] fs: dlm: remove ls_remove_wait waitqueue Alexander Aring
                   ` (5 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch allows to give the use control about the allocation context
based on a per message basis. Currently all messages forced to be
created under GFP_NOFS context.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c     | 31 +++++++++++++++++++------------
 fs/dlm/memory.c   |  4 ++--
 fs/dlm/memory.h   |  2 +-
 fs/dlm/midcomms.c |  2 +-
 4 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f4b2fb17bbb1..a2930e33c134 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3554,7 +3554,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 static int _create_message(struct dlm_ls *ls, int mb_len,
 			   int to_nodeid, int mstype,
 			   struct dlm_message **ms_ret,
-			   struct dlm_mhandle **mh_ret)
+			   struct dlm_mhandle **mh_ret,
+			   gfp_t allocation)
 {
 	struct dlm_message *ms;
 	struct dlm_mhandle *mh;
@@ -3564,7 +3565,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
 	   pass into midcomms_commit and a message buffer (mb) that we
 	   write our data into */
 
-	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
+	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
 	if (!mh)
 		return -ENOBUFS;
 
@@ -3586,7 +3587,8 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
 			  int to_nodeid, int mstype,
 			  struct dlm_message **ms_ret,
-			  struct dlm_mhandle **mh_ret)
+			  struct dlm_mhandle **mh_ret,
+			  gfp_t allocation)
 {
 	int mb_len = sizeof(struct dlm_message);
 
@@ -3607,7 +3609,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
 	}
 
 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
-			       ms_ret, mh_ret);
+			       ms_ret, mh_ret, allocation);
 }
 
 /* further lowcomms enhancements or alternate implementations may make
@@ -3676,7 +3678,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
 	if (error)
 		return error;
 
-	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
+	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
 	if (error)
 		goto fail;
 
@@ -3737,7 +3739,8 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
+	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
+			       GFP_NOFS);
 	if (error)
 		goto out;
 
@@ -3758,7 +3761,8 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
+			       GFP_NOFS);
 	if (error)
 		goto out;
 
@@ -3783,7 +3787,8 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
 	if (error)
 		return error;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
+			       GFP_NOFS);
 	if (error)
 		goto fail;
 
@@ -3807,7 +3812,8 @@ static int send_remove(struct dlm_rsb *r)
 
 	to_nodeid = dlm_dir_nodeid(r);
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
+			       GFP_NOFS);
 	if (error)
 		goto out;
 
@@ -3828,7 +3834,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
+	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
 	if (error)
 		goto out;
 
@@ -3869,7 +3875,8 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
 	struct dlm_mhandle *mh;
 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
 
-	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
+	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
+			       GFP_NOFS);
 	if (error)
 		goto out;
 
@@ -6295,7 +6302,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
 	int error;
 
 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
-				DLM_MSG_PURGE, &ms, &mh);
+				DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
 	if (error)
 		return error;
 	ms->m_nodeid = cpu_to_le32(nodeid);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 61fe0d1f5646..eb7a08641fcf 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -134,9 +134,9 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
 	kmem_cache_free(lkb_cache, lkb);
 }
 
-struct dlm_mhandle *dlm_allocate_mhandle(void)
+struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation)
 {
-	return kmem_cache_alloc(mhandle_cache, GFP_NOFS);
+	return kmem_cache_alloc(mhandle_cache, allocation);
 }
 
 void dlm_free_mhandle(struct dlm_mhandle *mhandle)
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index c1583ec8b2cf..6b29563d24f7 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -20,7 +20,7 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
 void dlm_free_lkb(struct dlm_lkb *l);
 char *dlm_allocate_lvb(struct dlm_ls *ls);
 void dlm_free_lvb(char *l);
-struct dlm_mhandle *dlm_allocate_mhandle(void);
+struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation);
 void dlm_free_mhandle(struct dlm_mhandle *mhandle);
 struct writequeue_entry *dlm_allocate_writequeue(void);
 void dlm_free_writequeue(struct writequeue_entry *writequeue);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index e10a6e97df44..3a4e20d6cd2d 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -1097,7 +1097,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 	/* this is a bug, however we going on and hope it will be resolved */
 	WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
 
-	mh = dlm_allocate_mhandle();
+	mh = dlm_allocate_mhandle(allocation);
 	if (!mh)
 		goto err;
 
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 13/18] fs: dlm: remove ls_remove_wait waitqueue
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (10 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 12/18] fs: dlm: allow different allocation context per _create_message Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 14/18] fs: dlm: relax sending to allow receiving Alexander Aring
                   ` (4 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch removes the ls_remove_wait waitqueue handling. The current
handling tries to wait before a lookup is send out for a identically
resource name which is going to be removed. Hereby the remove message
should be send out before the new lookup message. The reason is that
after a lookup request and response will actually use the specific
remote rsb. A followed remove message would delete the rsb on the remote
side but it's still being used.

To reach a similar behaviour we simple send the remove message out while
the rsb lookup lock is held and the rsb is removed from the toss list.
Other find_rsb() calls would never have the change to get a rsb back to
live while a remove message will be send out (without holding the lock).

This behaviour requires a non-sleepable context which should be provided
now and might be the reason why it was not implemented so in the first
place.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  4 ----
 fs/dlm/lock.c         | 56 ++-----------------------------------------
 fs/dlm/lockspace.c    |  3 ---
 3 files changed, 2 insertions(+), 61 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index c03f34fed5e9..167998abf034 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -592,11 +592,7 @@ struct dlm_ls {
 	int			ls_new_rsb_count;
 	struct list_head	ls_new_rsb;	/* new rsb structs */
 
-	spinlock_t		ls_remove_spin;
-	wait_queue_head_t	ls_remove_wait;
-	char			ls_remove_name[DLM_RESNAME_MAXLEN+1];
 	char			*ls_remove_names[DLM_REMOVE_NAMES_MAX];
-	int			ls_remove_len;
 	int			ls_remove_lens[DLM_REMOVE_NAMES_MAX];
 
 	struct list_head	ls_nodes;	/* current nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index a2930e33c134..e1adfa5aed05 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1589,37 +1589,6 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 	return error;
 }
 
-/* If there's an rsb for the same resource being removed, ensure
- * that the remove message is sent before the new lookup message.
- */
-
-#define DLM_WAIT_PENDING_COND(ls, r)		\
-	(ls->ls_remove_len &&			\
-	 !rsb_cmp(r, ls->ls_remove_name,	\
-		  ls->ls_remove_len))
-
-static void wait_pending_remove(struct dlm_rsb *r)
-{
-	struct dlm_ls *ls = r->res_ls;
- restart:
-	spin_lock(&ls->ls_remove_spin);
-	if (DLM_WAIT_PENDING_COND(ls, r)) {
-		log_debug(ls, "delay lookup for remove dir %d %s",
-			  r->res_dir_nodeid, r->res_name);
-		spin_unlock(&ls->ls_remove_spin);
-		wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
-		goto restart;
-	}
-	spin_unlock(&ls->ls_remove_spin);
-}
-
-/*
- * ls_remove_spin protects ls_remove_name and ls_remove_len which are
- * read by other threads in wait_pending_remove.  ls_remove_names
- * and ls_remove_lens are only used by the scan thread, so they do
- * not need protection.
- */
-
 static void shrink_bucket(struct dlm_ls *ls, int b)
 {
 	struct rb_node *n, *next;
@@ -1701,11 +1670,6 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 	 * list and sending the removal.  Keeping this gap small is
 	 * important to keep us (the master node) from being out of sync
 	 * with the remote dir node for very long.
-	 *
-	 * From the time the rsb is removed from toss until just after
-	 * send_remove, the rsb name is saved in ls_remove_name.  A new
-	 * lookup checks this to ensure that a new lookup message for the
-	 * same resource name is not sent just before the remove message.
 	 */
 
 	for (i = 0; i < remote_count; i++) {
@@ -1752,22 +1716,8 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-
-		/* block lookup of same name until we've sent remove */
-		spin_lock(&ls->ls_remove_spin);
-		ls->ls_remove_len = len;
-		memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
-		spin_unlock(&ls->ls_remove_spin);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
-
 		send_remove(r);
-
-		/* allow lookup of name again */
-		spin_lock(&ls->ls_remove_spin);
-		ls->ls_remove_len = 0;
-		memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
-		spin_unlock(&ls->ls_remove_spin);
-		wake_up(&ls->ls_remove_wait);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
 
 		dlm_free_rsb(r);
 	}
@@ -2718,8 +2668,6 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 		return 0;
 	}
 
-	wait_pending_remove(r);
-
 	r->res_first_lkid = lkb->lkb_id;
 	send_lookup(r, lkb);
 	return 1;
@@ -3813,7 +3761,7 @@ static int send_remove(struct dlm_rsb *r)
 	to_nodeid = dlm_dir_nodeid(r);
 
 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
-			       GFP_NOFS);
+			       GFP_ATOMIC);
 	if (error)
 		goto out;
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 72fa8a9d7a40..c438b909f221 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -524,9 +524,6 @@ static int new_lockspace(const char *name, const char *cluster,
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
-	spin_lock_init(&ls->ls_remove_spin);
-	init_waitqueue_head(&ls->ls_remove_wait);
-
 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 						 GFP_KERNEL);
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 14/18] fs: dlm: relax sending to allow receiving
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (11 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 13/18] fs: dlm: remove ls_remove_wait waitqueue Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 15/18] fs: dlm: catch dlm_add_member() error Alexander Aring
                   ` (3 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch drops additionally the sock_mutex when there is a sending
message burst. Since we have acknowledge handling we free sending
buffers only when we receive an ack back, but if we are stuck in
send_to_sock() looping because dlm sends a lot of messages and we never
leave the loop the sending buffer fill up very quickly. We can't receive
during this iteration because the sock_mutex is held. This patch will
unlock the sock_mutex so it should be possible to receive messages when
a burst of sending messages happens. This will allow to free up memory
because acks which are already received can be processed.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 871d4e9f49fb..b05c6d9b5102 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1418,7 +1418,10 @@ static void send_to_sock(struct connection *con)
 	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
 	struct writequeue_entry *e;
 	int len, offset, ret;
-	int count = 0;
+	int count;
+
+again:
+	count = 0;
 
 	mutex_lock(&con->sock_mutex);
 	if (con->sock == NULL)
@@ -1453,14 +1456,16 @@ static void send_to_sock(struct connection *con)
 		} else if (ret < 0)
 			goto out;
 
+		spin_lock(&con->writequeue_lock);
+		writequeue_entry_complete(e, ret);
+
 		/* Don't starve people filling buffers */
 		if (++count >= MAX_SEND_MSG_COUNT) {
+			spin_unlock(&con->writequeue_lock);
+			mutex_unlock(&con->sock_mutex);
 			cond_resched();
-			count = 0;
+			goto again;
 		}
-
-		spin_lock(&con->writequeue_lock);
-		writequeue_entry_complete(e, ret);
 	}
 	spin_unlock(&con->writequeue_lock);
 
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 15/18] fs: dlm: catch dlm_add_member() error
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (12 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 14/18] fs: dlm: relax sending to allow receiving Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 16/18] fs: dlm: fix log of lowcomms vs midcomms Alexander Aring
                   ` (2 subsequent siblings)
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will catch a possible dlm_add_member() and delivers it to the
dlm recovery handling.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/member.c | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 2af2ccfe43a9..923c01a8a0aa 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -573,7 +573,10 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 		node = &rv->nodes[i];
 		if (dlm_is_member(ls, node->nodeid))
 			continue;
-		dlm_add_member(ls, node);
+		error = dlm_add_member(ls, node);
+		if (error)
+			return error;
+
 		log_rinfo(ls, "add member %d", node->nodeid);
 	}
 
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 16/18] fs: dlm: fix log of lowcomms vs midcomms
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (13 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 15/18] fs: dlm: catch dlm_add_member() error Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 17/18] fs: dlm: use WARN_ON_ONCE() instead of WARN_ON() Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 18/18] fs: dlm: rework lowcomms handling Alexander Aring
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch will fix a small issue when printing out that
dlm_midcomms_start() failed to start and it was printing out that the
dlm subcomponent lowcomms was failed but lowcomms is behind the midcomms
layer.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lockspace.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index c438b909f221..cb7531ed89bc 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -391,7 +391,7 @@ static int threads_start(void)
 	/* Thread for sending/receiving messages for all lockspace's */
 	error = dlm_midcomms_start();
 	if (error) {
-		log_print("cannot start dlm lowcomms %d", error);
+		log_print("cannot start dlm midcomms %d", error);
 		goto scand_fail;
 	}
 
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 17/18] fs: dlm: use WARN_ON_ONCE() instead of WARN_ON()
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (14 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 16/18] fs: dlm: fix log of lowcomms vs midcomms Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 18/18] fs: dlm: rework lowcomms handling Alexander Aring
  16 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

To not get the console spammed about WARN_ON() of invalid states in the
dlm midcomms hot path handling we switch to WARN_ON_ONCE() to get it
only once that there might be an issue with the midcomms state handling.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/midcomms.c | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 3a4e20d6cd2d..32194a750fe1 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -469,7 +469,7 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 		spin_unlock(&node->state_lock);
 		log_print("%s: unexpected state: %d\n",
 			  __func__, node->state);
-		WARN_ON(1);
+		WARN_ON_ONCE(1);
 		return;
 	}
 	spin_unlock(&node->state_lock);
@@ -540,7 +540,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 				spin_unlock(&node->state_lock);
 				log_print("%s: unexpected state: %d\n",
 					  __func__, node->state);
-				WARN_ON(1);
+				WARN_ON_ONCE(1);
 				return;
 			}
 			spin_unlock(&node->state_lock);
@@ -548,7 +548,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 			set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
 			break;
 		default:
-			WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
+			WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 			dlm_receive_buffer_3_2_trace(seq, p);
 			dlm_receive_buffer(p, node->nodeid);
 			set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
@@ -770,7 +770,7 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
 			goto out;
 		}
 
-		WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
+		WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 		dlm_receive_buffer(p, nodeid);
 		break;
 	case DLM_OPTS:
@@ -1095,7 +1095,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 	}
 
 	/* this is a bug, however we going on and hope it will be resolved */
-	WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
+	WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
 
 	mh = dlm_allocate_mhandle(allocation);
 	if (!mh)
@@ -1127,7 +1127,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 		break;
 	default:
 		dlm_free_mhandle(mh);
-		WARN_ON(1);
+		WARN_ON_ONCE(1);
 		goto err;
 	}
 
@@ -1196,7 +1196,7 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh,
 		break;
 	default:
 		srcu_read_unlock(&nodes_srcu, mh->idx);
-		WARN_ON(1);
+		WARN_ON_ONCE(1);
 		break;
 	}
 }
@@ -1238,7 +1238,7 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
 		spin_unlock(&node->state_lock);
 		log_print("%s: unexpected state: %d\n",
 			  __func__, node->state);
-		WARN_ON(1);
+		WARN_ON_ONCE(1);
 		return;
 	}
 	spin_unlock(&node->state_lock);
@@ -1356,7 +1356,7 @@ static void midcomms_node_release(struct rcu_head *rcu)
 {
 	struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
 
-	WARN_ON(atomic_read(&node->send_queue_cnt));
+	WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
 	kfree(node);
 }
 
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 18/18] fs: dlm: rework lowcomms handling
  2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
                   ` (15 preceding siblings ...)
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 17/18] fs: dlm: use WARN_ON_ONCE() instead of WARN_ON() Alexander Aring
@ 2022-10-27 20:45 ` Alexander Aring
  2022-10-28 21:25   ` Alexander Aring
  16 siblings, 1 reply; 20+ messages in thread
From: Alexander Aring @ 2022-10-27 20:45 UTC (permalink / raw)
  To: cluster-devel.redhat.com

This patch is rework of lowcomms handling, the main goal was here to
handle recvmsg() and sendpage() to run parallel. Parallel in two sense:
1. per connection and 2. that recvmsg()/sendpage() doesn't block each
other.

Currently recvmsg()/sendpage() cannot run parallel because two
workqueues "dlm_recv" and "dlm_send" are ordered workqeues. That's said
and we having two workqueues for receive and send, they block each if
both workqueues schedule the same connection at the same time because a
per connection mutex to prevent that the socket gets released by being
used.

To make it more parallel we introduce one "dlm_io" workqueue which is
not an ordered workqueue, the amount of workers are not limited. Due
per connection flags SEND/RECV pending we schedule workers ordered per
connection and per send and receive task. To get rid of the mutex
blocking same workers to do socket handling we switched to a semaphore
which handles socket operations as read lock and sock releases as write
operations, to prevent sock_release() being called while the socket is
being used.

There might be more optimization removing the semaphore and replacing it
with rcu, however due other circumstances e.g. othercon behaviour it
seems complicated to doing this change. I added comments to remove the
othercon handling and moving to rcu handling. We need to do that to the
next dlm major version upgrade because it is not backwards compatible
with the current connect mechanism.

The processing of dlm messages need to be still handled by a ordered
workqueue. An dlm_process ordered workqeue was introduced which gets
filled by the receive worker. This is probably the next bottleneck of
DLM but the application can't currently parse dlm messages parallel. A
comment was introduced to lift the workqueue context of dlm processing
in a non-sleepable softirq.

The lifetime of a connection struct is now different because the
dlm_node_addr structure was merged into the connection struct. The
shutdown handling for TCP was removed to handle half-open sockets. We
have a application handling for dealing with half-opened connection
which is required by dlm and cluster manager join/leave events. SCTP
doesn't support half-closed sockets and it's already handled by the
application to never run into such case, therefore we removed it.

To check if lowcomms is currently running and it cannot be configured at
this time we introduced a function dlm_lowcomms_is_running() and remove
the dlm_allow_conn variable. The synchronization of stopping all listen
socket workers goes now of it's data ready socket callback.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/config.c    |    4 +-
 fs/dlm/lockspace.c |    5 +-
 fs/dlm/lowcomms.c  | 1453 ++++++++++++++++++++------------------------
 fs/dlm/lowcomms.h  |    6 +-
 fs/dlm/main.c      |    7 +-
 fs/dlm/midcomms.c  |   68 ++-
 fs/dlm/midcomms.h  |    4 +
 7 files changed, 730 insertions(+), 817 deletions(-)

diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index ac8b62106ce0..20b60709eccf 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -183,7 +183,7 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x)
 		return -EINVAL;
 	}
 
-	if (dlm_allow_conn)
+	if (dlm_lowcomms_is_running())
 		return -EBUSY;
 
 	return 0;
@@ -194,7 +194,7 @@ static int dlm_check_zero_and_dlm_running(unsigned int x)
 	if (!x)
 		return -EINVAL;
 
-	if (dlm_allow_conn)
+	if (dlm_lowcomms_is_running())
 		return -EBUSY;
 
 	return 0;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index cb7531ed89bc..c088de918366 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -17,7 +17,6 @@
 #include "recoverd.h"
 #include "dir.h"
 #include "midcomms.h"
-#include "lowcomms.h"
 #include "config.h"
 #include "memory.h"
 #include "lock.h"
@@ -723,7 +722,7 @@ static int __dlm_new_lockspace(const char *name, const char *cluster,
 	if (!ls_count) {
 		dlm_scand_stop();
 		dlm_midcomms_shutdown();
-		dlm_lowcomms_stop();
+		dlm_midcomms_stop();
 	}
  out:
 	mutex_unlock(&ls_lock);
@@ -926,7 +925,7 @@ int dlm_release_lockspace(void *lockspace, int force)
 	if (!error)
 		ls_count--;
 	if (!ls_count)
-		dlm_lowcomms_stop();
+		dlm_midcomms_stop();
 	mutex_unlock(&ls_lock);
 
 	return error;
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index b05c6d9b5102..0f6c417e47ac 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -63,41 +63,48 @@
 
 #define NEEDED_RMEM (4*1024*1024)
 
-/* Number of messages to send before rescheduling */
-#define MAX_SEND_MSG_COUNT 25
-#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
-
 struct connection {
 	struct socket *sock;	/* NULL if not connected */
 	uint32_t nodeid;	/* So we know who we are in the list */
-	struct mutex sock_mutex;
+	/* this semaphore is used to allow parallel recv/send in read
+	 * lock mode. When we release a sock we need to held the write lock.
+	 *
+	 * However this is locking code and not nice. When we remove the
+	 * othercon handling we should use rcu subsystem to control release
+	 * sock at the right time.
+	 */
+	struct rw_semaphore sock_lock;
 	unsigned long flags;
-#define CF_READ_PENDING 1
-#define CF_WRITE_PENDING 2
-#define CF_INIT_PENDING 4
-#define CF_IS_OTHERCON 5
-#define CF_CLOSE 6
-#define CF_APP_LIMITED 7
-#define CF_CLOSING 8
-#define CF_SHUTDOWN 9
-#define CF_CONNECTED 10
-#define CF_RECONNECT 11
-#define CF_DELAY_CONNECT 12
-#define CF_EOF 13
+#define CF_APP_LIMITED 0
+#define CF_RECV_PENDING 1
+#define CF_SEND_PENDING 2
+#define CF_RECV_INTR 3
+#define CF_IO_STOP 4
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
-	atomic_t writequeue_cnt;
 	int retries;
-#define MAX_CONNECT_RETRIES 3
 	struct hlist_node list;
+	/* due some connect()/accept() races we currently have this cross over
+	 * connection attempt second connection for one node.
+	 *
+	 * There is a solution to avoid the race by introducing a connect
+	 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
+	 * connect. Otherside can connect but will only be considered that
+	 * the other side wants to have a reconnect.
+	 *
+	 * However changing to this behaviour will break backwards compatible.
+	 * In a DLM protocol major version upgrade we should remove this!
+	 */
 	struct connection *othercon;
-	struct connection *sendcon;
-	struct work_struct rwork; /* Receive workqueue */
-	struct work_struct swork; /* Send workqueue */
-	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
-	unsigned char *rx_buf;
-	int rx_buflen;
+	struct work_struct rwork; /* receive worker */
+	struct work_struct swork; /* send worker */
+	unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
 	int rx_leftover;
+	int mark;
+	int addr_count;
+	int curr_addr_index;
+	struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
+	spinlock_t addrs_lock;
 	struct rcu_head rcu;
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -136,13 +143,12 @@ struct dlm_msg {
 	struct kref ref;
 };
 
-struct dlm_node_addr {
-	struct list_head list;
+struct processqueue_entry {
+	unsigned char *buf;
 	int nodeid;
-	int mark;
-	int addr_count;
-	int curr_addr_index;
-	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
+	int buflen;
+
+	struct list_head list;
 };
 
 struct dlm_proto_ops {
@@ -157,10 +163,6 @@ struct dlm_proto_ops {
 	int (*listen_validate)(void);
 	void (*listen_sockopts)(struct socket *sock);
 	int (*listen_bind)(struct socket *sock);
-	/* What to do to shutdown */
-	void (*shutdown_action)(struct connection *con);
-	/* What to do to eof check */
-	bool (*eof_condition)(struct connection *con);
 };
 
 static struct listen_sock_callbacks {
@@ -170,17 +172,13 @@ static struct listen_sock_callbacks {
 	void (*sk_write_space)(struct sock *);
 } listen_sock;
 
-static LIST_HEAD(dlm_node_addrs);
-static DEFINE_SPINLOCK(dlm_node_addrs_spin);
-
 static struct listen_connection listen_con;
-static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
+static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
 static int dlm_local_count;
-int dlm_allow_conn;
 
 /* Work queues */
-static struct workqueue_struct *recv_workqueue;
-static struct workqueue_struct *send_workqueue;
+static struct workqueue_struct *io_workqueue;
+static struct workqueue_struct *process_workqueue;
 
 static struct hlist_head connection_hash[CONN_HASH_SIZE];
 static DEFINE_SPINLOCK(connections_lock);
@@ -188,8 +186,43 @@ DEFINE_STATIC_SRCU(connections_srcu);
 
 static const struct dlm_proto_ops *dlm_proto_ops;
 
+#define DLM_IO_SUCCESS 0
+#define DLM_IO_END 1
+#define DLM_IO_EOF 2
+#define DLM_IO_RESCHED 3
+
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
+static void process_dlm_messages(struct work_struct *work);
+
+static DECLARE_WORK(process_work, process_dlm_messages);
+static DEFINE_SPINLOCK(processqueue_lock);
+static bool process_dlm_messages_pending;
+static LIST_HEAD(processqueue);
+
+bool dlm_lowcomms_is_running(void)
+{
+	return !!listen_con.sock;
+}
+
+static void lowcomms_queue_swork(struct connection *con)
+{
+	WARN_ON_ONCE(!lockdep_is_held(&con->writequeue_lock));
+
+	if (!test_bit(CF_IO_STOP, &con->flags) &&
+	    !test_bit(CF_APP_LIMITED, &con->flags) &&
+	    !test_and_set_bit(CF_SEND_PENDING, &con->flags))
+		queue_work(io_workqueue, &con->swork);
+}
+
+static void lowcomms_queue_rwork(struct connection *con)
+{
+	WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
+
+	if (!test_bit(CF_IO_STOP, &con->flags) &&
+	    !test_and_set_bit(CF_RECV_PENDING, &con->flags))
+		queue_work(io_workqueue, &con->rwork);
+}
 
 static void writequeue_entry_ctor(void *data)
 {
@@ -214,15 +247,12 @@ static struct writequeue_entry *con_next_wq(struct connection *con)
 {
 	struct writequeue_entry *e;
 
-	if (list_empty(&con->writequeue))
-		return NULL;
-
-	e = list_first_entry(&con->writequeue, struct writequeue_entry,
-			     list);
+	e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
+				     list);
 	/* if len is zero nothing is to send, if there are users filling
 	 * buffers we wait until the users are done so we can send more.
 	 */
-	if (e->users || e->len == 0)
+	if (!e || e->users || e->len == 0)
 		return NULL;
 
 	return e;
@@ -240,28 +270,15 @@ static struct connection *__find_con(int nodeid, int r)
 	return NULL;
 }
 
-static bool tcp_eof_condition(struct connection *con)
-{
-	return atomic_read(&con->writequeue_cnt);
-}
-
-static int dlm_con_init(struct connection *con, int nodeid)
+static void dlm_con_init(struct connection *con, int nodeid)
 {
-	con->rx_buflen = dlm_config.ci_buffer_size;
-	con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
-	if (!con->rx_buf)
-		return -ENOMEM;
-
 	con->nodeid = nodeid;
-	mutex_init(&con->sock_mutex);
+	init_rwsem(&con->sock_lock);
 	INIT_LIST_HEAD(&con->writequeue);
 	spin_lock_init(&con->writequeue_lock);
-	atomic_set(&con->writequeue_cnt, 0);
 	INIT_WORK(&con->swork, process_send_sockets);
 	INIT_WORK(&con->rwork, process_recv_sockets);
-	init_waitqueue_head(&con->shutdown_wait);
-
-	return 0;
+	spin_lock_init(&con->addrs_lock);
 }
 
 /*
@@ -271,7 +288,7 @@ static int dlm_con_init(struct connection *con, int nodeid)
 static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 {
 	struct connection *con, *tmp;
-	int r, ret;
+	int r;
 
 	r = nodeid_hash(nodeid);
 	con = __find_con(nodeid, r);
@@ -282,11 +299,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 	if (!con)
 		return NULL;
 
-	ret = dlm_con_init(con, nodeid);
-	if (ret) {
-		kfree(con);
-		return NULL;
-	}
+	dlm_con_init(con, nodeid);
 
 	spin_lock(&connections_lock);
 	/* Because multiple workqueues/threads calls this function it can
@@ -298,7 +311,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 	tmp = __find_con(nodeid, r);
 	if (tmp) {
 		spin_unlock(&connections_lock);
-		kfree(con->rx_buf);
 		kfree(con);
 		return tmp;
 	}
@@ -309,29 +321,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 	return con;
 }
 
-/* Loop round all connections */
-static void foreach_conn(void (*conn_func)(struct connection *c))
-{
-	int i;
-	struct connection *con;
-
-	for (i = 0; i < CONN_HASH_SIZE; i++) {
-		hlist_for_each_entry_rcu(con, &connection_hash[i], list)
-			conn_func(con);
-	}
-}
-
-static struct dlm_node_addr *find_node_addr(int nodeid)
-{
-	struct dlm_node_addr *na;
-
-	list_for_each_entry(na, &dlm_node_addrs, list) {
-		if (na->nodeid == nodeid)
-			return na;
-	}
-	return NULL;
-}
-
 static int addr_compare(const struct sockaddr_storage *x,
 			const struct sockaddr_storage *y)
 {
@@ -365,40 +354,47 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 			  unsigned int *mark)
 {
 	struct sockaddr_storage sas;
-	struct dlm_node_addr *na;
+	struct connection *con;
+	int idx;
 
 	if (!dlm_local_count)
 		return -1;
 
-	spin_lock(&dlm_node_addrs_spin);
-	na = find_node_addr(nodeid);
-	if (na && na->addr_count) {
-		memcpy(&sas, na->addr[na->curr_addr_index],
-		       sizeof(struct sockaddr_storage));
+	idx = srcu_read_lock(&connections_srcu);
+	con = nodeid2con(nodeid, 0);
+	if (!con) {
+		srcu_read_unlock(&connections_srcu, idx);
+		return -ENOENT;
+	}
 
-		if (try_new_addr) {
-			na->curr_addr_index++;
-			if (na->curr_addr_index == na->addr_count)
-				na->curr_addr_index = 0;
-		}
+	spin_lock(&con->addrs_lock);
+	if (!con->addr_count) {
+		spin_unlock(&con->addrs_lock);
+		srcu_read_unlock(&connections_srcu, idx);
+		return -ENOENT;
 	}
-	spin_unlock(&dlm_node_addrs_spin);
 
-	if (!na)
-		return -EEXIST;
+	memcpy(&sas, &con->addr[con->curr_addr_index],
+	       sizeof(struct sockaddr_storage));
 
-	if (!na->addr_count)
-		return -ENOENT;
+	if (try_new_addr) {
+		con->curr_addr_index++;
+		if (con->curr_addr_index == con->addr_count)
+			con->curr_addr_index = 0;
+	}
 
-	*mark = na->mark;
+	*mark = con->mark;
+	spin_unlock(&con->addrs_lock);
 
 	if (sas_out)
 		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 
-	if (!sa_out)
+	if (!sa_out) {
+		srcu_read_unlock(&connections_srcu, idx);
 		return 0;
+	}
 
-	if (dlm_local_addr[0]->ss_family == AF_INET) {
+	if (dlm_local_addr[0].ss_family == AF_INET) {
 		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
@@ -408,43 +404,46 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 		ret6->sin6_addr = in6->sin6_addr;
 	}
 
+	srcu_read_unlock(&connections_srcu, idx);
 	return 0;
 }
 
 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
 			  unsigned int *mark)
 {
-	struct dlm_node_addr *na;
-	int rv = -EEXIST;
-	int addr_i;
-
-	spin_lock(&dlm_node_addrs_spin);
-	list_for_each_entry(na, &dlm_node_addrs, list) {
-		if (!na->addr_count)
-			continue;
-
-		for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
-			if (addr_compare(na->addr[addr_i], addr)) {
-				*nodeid = na->nodeid;
-				*mark = na->mark;
-				rv = 0;
-				goto unlock;
+	struct connection *con;
+	int i, idx, addr_i;
+
+	idx = srcu_read_lock(&connections_srcu);
+	for (i = 0; i < CONN_HASH_SIZE; i++) {
+		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
+			WARN_ON_ONCE(!con->addr_count);
+
+			spin_lock(&con->addrs_lock);
+			for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
+				if (addr_compare(&con->addr[addr_i], addr)) {
+					*nodeid = con->nodeid;
+					*mark = con->mark;
+					spin_unlock(&con->addrs_lock);
+					srcu_read_unlock(&connections_srcu, idx);
+					return 0;
+				}
 			}
+			spin_unlock(&con->addrs_lock);
 		}
 	}
-unlock:
-	spin_unlock(&dlm_node_addrs_spin);
-	return rv;
+	srcu_read_unlock(&connections_srcu, idx);
+
+	return -ENOENT;
 }
 
-/* caller need to held dlm_node_addrs_spin lock */
-static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
-				     const struct sockaddr_storage *addr)
+static bool dlm_lowcomms_con_has_addr(const struct connection *con,
+				      const struct sockaddr_storage *addr)
 {
 	int i;
 
-	for (i = 0; i < na->addr_count; i++) {
-		if (addr_compare(na->addr[i], addr))
+	for (i = 0; i < con->addr_count; i++) {
+		if (addr_compare(&con->addr[i], addr))
 			return true;
 	}
 
@@ -453,118 +452,82 @@ static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
 
 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 {
-	struct sockaddr_storage *new_addr;
-	struct dlm_node_addr *new_node, *na;
-	bool ret;
-
-	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
-	if (!new_node)
-		return -ENOMEM;
+	struct connection *con;
+	bool ret, idx;
 
-	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
-	if (!new_addr) {
-		kfree(new_node);
+	idx = srcu_read_lock(&connections_srcu);
+	con = nodeid2con(nodeid, GFP_NOFS);
+	if (!con) {
+		srcu_read_unlock(&connections_srcu, idx);
 		return -ENOMEM;
 	}
 
-	memcpy(new_addr, addr, len);
-
-	spin_lock(&dlm_node_addrs_spin);
-	na = find_node_addr(nodeid);
-	if (!na) {
-		new_node->nodeid = nodeid;
-		new_node->addr[0] = new_addr;
-		new_node->addr_count = 1;
-		new_node->mark = dlm_config.ci_mark;
-		list_add(&new_node->list, &dlm_node_addrs);
-		spin_unlock(&dlm_node_addrs_spin);
+	spin_lock(&con->addrs_lock);
+	if (!con->addr_count) {
+		memcpy(&con->addr[0], addr, sizeof(*addr));
+		con->addr_count = 1;
+		con->mark = dlm_config.ci_mark;
+		spin_unlock(&con->addrs_lock);
+		srcu_read_unlock(&connections_srcu, idx);
 		return 0;
 	}
 
-	ret = dlm_lowcomms_na_has_addr(na, addr);
+	ret = dlm_lowcomms_con_has_addr(con, addr);
 	if (ret) {
-		spin_unlock(&dlm_node_addrs_spin);
-		kfree(new_addr);
-		kfree(new_node);
+		spin_unlock(&con->addrs_lock);
+		srcu_read_unlock(&connections_srcu, idx);
 		return -EEXIST;
 	}
 
-	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
-		spin_unlock(&dlm_node_addrs_spin);
-		kfree(new_addr);
-		kfree(new_node);
+	if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
+		spin_unlock(&con->addrs_lock);
+		srcu_read_unlock(&connections_srcu, idx);
 		return -ENOSPC;
 	}
 
-	na->addr[na->addr_count++] = new_addr;
-	spin_unlock(&dlm_node_addrs_spin);
-	kfree(new_node);
+	memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
+	srcu_read_unlock(&connections_srcu, idx);
+	spin_unlock(&con->addrs_lock);
 	return 0;
 }
 
 /* Data available on socket or listen socket received a connect */
 static void lowcomms_data_ready(struct sock *sk)
 {
-	struct connection *con;
+	struct connection *con = sock2con(sk);
 
-	con = sock2con(sk);
-	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
-		queue_work(recv_workqueue, &con->rwork);
-}
-
-static void lowcomms_listen_data_ready(struct sock *sk)
-{
-	if (!dlm_allow_conn)
-		return;
-
-	queue_work(recv_workqueue, &listen_con.rwork);
+	set_bit(CF_RECV_INTR, &con->flags);
+	lowcomms_queue_rwork(con);
 }
 
 static void lowcomms_write_space(struct sock *sk)
 {
-	struct connection *con;
-
-	con = sock2con(sk);
-	if (!con)
-		return;
-
-	if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
-		log_print("connected to node %d", con->nodeid);
-		queue_work(send_workqueue, &con->swork);
-		return;
-	}
+	struct connection *con = sock2con(sk);
 
 	clear_bit(SOCK_NOSPACE, &con->sock->flags);
 
+	spin_lock_bh(&con->writequeue_lock);
 	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 		con->sock->sk->sk_write_pending--;
 		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
 	}
 
-	queue_work(send_workqueue, &con->swork);
-}
-
-static inline void lowcomms_connect_sock(struct connection *con)
-{
-	if (test_bit(CF_CLOSE, &con->flags))
-		return;
-	queue_work(send_workqueue, &con->swork);
-	cond_resched();
+	lowcomms_queue_swork(con);
+	spin_unlock_bh(&con->writequeue_lock);
 }
 
 static void lowcomms_state_change(struct sock *sk)
 {
 	/* SCTP layer is not calling sk_data_ready when the connection
-	 * is done, so we catch the signal through here. Also, it
-	 * doesn't switch socket state when entering shutdown, so we
-	 * skip the write in that case.
+	 * is done, so we catch the signal through here.
 	 */
-	if (sk->sk_shutdown) {
-		if (sk->sk_shutdown == RCV_SHUTDOWN)
-			lowcomms_data_ready(sk);
-	} else if (sk->sk_state == TCP_ESTABLISHED) {
-		lowcomms_write_space(sk);
-	}
+	if (sk->sk_shutdown == RCV_SHUTDOWN)
+		lowcomms_data_ready(sk);
+}
+
+static void lowcomms_listen_data_ready(struct sock *sk)
+{
+	queue_work(io_workqueue, &listen_con.rwork);
 }
 
 int dlm_lowcomms_connect_node(int nodeid)
@@ -576,47 +539,49 @@ int dlm_lowcomms_connect_node(int nodeid)
 		return 0;
 
 	idx = srcu_read_lock(&connections_srcu);
-	con = nodeid2con(nodeid, GFP_NOFS);
-	if (!con) {
+	con = nodeid2con(nodeid, 0);
+	if (WARN_ON_ONCE(!con)) {
 		srcu_read_unlock(&connections_srcu, idx);
-		return -ENOMEM;
+		return -ENOENT;
 	}
 
-	lowcomms_connect_sock(con);
+	down_read(&con->sock_lock);
+	if (!con->sock) {
+		spin_lock_bh(&con->writequeue_lock);
+		lowcomms_queue_swork(con);
+		spin_unlock_bh(&con->writequeue_lock);
+	}
+	up_read(&con->sock_lock);
 	srcu_read_unlock(&connections_srcu, idx);
 
+	cond_resched();
 	return 0;
 }
 
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
 {
-	struct dlm_node_addr *na;
+	struct connection *con;
+	int idx;
 
-	spin_lock(&dlm_node_addrs_spin);
-	na = find_node_addr(nodeid);
-	if (!na) {
-		spin_unlock(&dlm_node_addrs_spin);
+	idx = srcu_read_lock(&connections_srcu);
+	con = nodeid2con(nodeid, 0);
+	if (!con) {
+		srcu_read_unlock(&connections_srcu, idx);
 		return -ENOENT;
 	}
 
-	na->mark = mark;
-	spin_unlock(&dlm_node_addrs_spin);
-
+	spin_lock(&con->addrs_lock);
+	con->mark = mark;
+	spin_unlock(&con->addrs_lock);
+	srcu_read_unlock(&connections_srcu, idx);
 	return 0;
 }
 
 static void lowcomms_error_report(struct sock *sk)
 {
-	struct connection *con;
-	void (*orig_report)(struct sock *) = NULL;
+	struct connection *con = sock2con(sk);
 	struct inet_sock *inet;
 
-	con = sock2con(sk);
-	if (con == NULL)
-		goto out;
-
-	orig_report = listen_sock.sk_error_report;
-
 	inet = inet_sk(sk);
 	switch (sk->sk_family) {
 	case AF_INET:
@@ -642,66 +607,21 @@ static void lowcomms_error_report(struct sock *sk)
 				   "invalid socket family %d set, "
 				   "sk_err=%d/%d\n", dlm_our_nodeid(),
 				   sk->sk_family, sk->sk_err, sk->sk_err_soft);
-		goto out;
-	}
-
-	/* below sendcon only handling */
-	if (test_bit(CF_IS_OTHERCON, &con->flags))
-		con = con->sendcon;
-
-	switch (sk->sk_err) {
-	case ECONNREFUSED:
-		set_bit(CF_DELAY_CONNECT, &con->flags);
-		break;
-	default:
 		break;
 	}
 
-	if (!test_and_set_bit(CF_RECONNECT, &con->flags))
-		queue_work(send_workqueue, &con->swork);
-
-out:
-	if (orig_report)
-		orig_report(sk);
-}
-
-/* Note: sk_callback_lock must be locked before calling this function. */
-static void save_listen_callbacks(struct socket *sock)
-{
-	struct sock *sk = sock->sk;
-
-	listen_sock.sk_data_ready = sk->sk_data_ready;
-	listen_sock.sk_state_change = sk->sk_state_change;
-	listen_sock.sk_write_space = sk->sk_write_space;
-	listen_sock.sk_error_report = sk->sk_error_report;
+	listen_sock.sk_error_report(sk);
 }
 
-static void restore_callbacks(struct socket *sock)
+static void restore_callbacks(struct sock *sk)
 {
-	struct sock *sk = sock->sk;
+	WARN_ON_ONCE(!lockdep_sock_is_held(sk));
 
-	lock_sock(sk);
 	sk->sk_user_data = NULL;
 	sk->sk_data_ready = listen_sock.sk_data_ready;
 	sk->sk_state_change = listen_sock.sk_state_change;
 	sk->sk_write_space = listen_sock.sk_write_space;
 	sk->sk_error_report = listen_sock.sk_error_report;
-	release_sock(sk);
-}
-
-static void add_listen_sock(struct socket *sock, struct listen_connection *con)
-{
-	struct sock *sk = sock->sk;
-
-	lock_sock(sk);
-	save_listen_callbacks(sock);
-	con->sock = sock;
-
-	sk->sk_user_data = con;
-	sk->sk_allocation = GFP_NOFS;
-	/* Install a data_ready callback */
-	sk->sk_data_ready = lowcomms_listen_data_ready;
-	release_sock(sk);
 }
 
 /* Make a socket active */
@@ -713,10 +633,10 @@ static void add_sock(struct socket *sock, struct connection *con)
 	con->sock = sock;
 
 	sk->sk_user_data = con;
-	/* Install a data_ready callback */
 	sk->sk_data_ready = lowcomms_data_ready;
 	sk->sk_write_space = lowcomms_write_space;
-	sk->sk_state_change = lowcomms_state_change;
+	if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
+		sk->sk_state_change = lowcomms_state_change;
 	sk->sk_allocation = GFP_NOFS;
 	sk->sk_error_report = lowcomms_error_report;
 	release_sock(sk);
@@ -727,7 +647,7 @@ static void add_sock(struct socket *sock, struct connection *con)
 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 			  int *addr_len)
 {
-	saddr->ss_family =  dlm_local_addr[0]->ss_family;
+	saddr->ss_family =  dlm_local_addr[0].ss_family;
 	if (saddr->ss_family == AF_INET) {
 		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 		in4_addr->sin_port = cpu_to_be16(port);
@@ -773,42 +693,64 @@ static void free_entry(struct writequeue_entry *e)
 	}
 
 	list_del(&e->list);
-	atomic_dec(&e->con->writequeue_cnt);
 	kref_put(&e->ref, dlm_page_release);
 }
 
 static void dlm_close_sock(struct socket **sock)
 {
 	if (*sock) {
-		restore_callbacks(*sock);
+		lock_sock((*sock)->sk);
+		restore_callbacks((*sock)->sk);
+		release_sock((*sock)->sk);
 		sock_release(*sock);
 		*sock = NULL;
 	}
 }
 
-/* Close a remote connection and tidy up */
-static void close_connection(struct connection *con, bool and_other,
-			     bool tx, bool rx)
+
+static void allow_connection_io(struct connection *con)
 {
-	bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
-	struct writequeue_entry *e;
+	if (con->othercon)
+		clear_bit(CF_IO_STOP, &con->othercon->flags);
+	clear_bit(CF_IO_STOP, &con->flags);
+}
 
-	if (tx && !closing && cancel_work_sync(&con->swork)) {
-		log_print("canceled swork for node %d", con->nodeid);
-		clear_bit(CF_WRITE_PENDING, &con->flags);
-	}
-	if (rx && !closing && cancel_work_sync(&con->rwork)) {
-		log_print("canceled rwork for node %d", con->nodeid);
-		clear_bit(CF_READ_PENDING, &con->flags);
+static void stop_connection_io(struct connection *con)
+{
+	if (con->othercon)
+		stop_connection_io(con->othercon);
+
+	down_write(&con->sock_lock);
+	if (con->sock) {
+		lock_sock(con->sock->sk);
+		restore_callbacks(con->sock->sk);
+
+		spin_lock_bh(&con->writequeue_lock);
+		set_bit(CF_IO_STOP, &con->flags);
+		spin_unlock_bh(&con->writequeue_lock);
+		release_sock(con->sock->sk);
+	} else {
+		spin_lock_bh(&con->writequeue_lock);
+		set_bit(CF_IO_STOP, &con->flags);
+		spin_unlock_bh(&con->writequeue_lock);
 	}
+	up_write(&con->sock_lock);
+
+	cancel_work_sync(&con->swork);
+	cancel_work_sync(&con->rwork);
+}
+
 
-	mutex_lock(&con->sock_mutex);
+/* Close a remote connection and tidy up */
+static void close_connection(struct connection *con, bool and_other)
+{
+	struct writequeue_entry *e;
+
+	down_write(&con->sock_lock);
 	dlm_close_sock(&con->sock);
 
-	if (con->othercon && and_other) {
-		/* Will only re-enter once. */
-		close_connection(con->othercon, false, tx, rx);
-	}
+	if (con->othercon && and_other)
+		close_connection(con->othercon, false);
 
 	/* if we send a writequeue entry only a half way, we drop the
 	 * whole entry because reconnection and that we not start of the
@@ -821,200 +763,172 @@ static void close_connection(struct connection *con, bool and_other,
 	 * our policy is to start on a clean state when disconnects, we don't
 	 * know what's send/received on transport layer in this case.
 	 */
-	spin_lock(&con->writequeue_lock);
+	spin_lock_bh(&con->writequeue_lock);
 	if (!list_empty(&con->writequeue)) {
 		e = list_first_entry(&con->writequeue, struct writequeue_entry,
 				     list);
 		if (e->dirty)
 			free_entry(e);
 	}
-	spin_unlock(&con->writequeue_lock);
+	spin_unlock_bh(&con->writequeue_lock);
 
 	con->rx_leftover = 0;
 	con->retries = 0;
 	clear_bit(CF_APP_LIMITED, &con->flags);
-	clear_bit(CF_CONNECTED, &con->flags);
-	clear_bit(CF_DELAY_CONNECT, &con->flags);
-	clear_bit(CF_RECONNECT, &con->flags);
-	clear_bit(CF_EOF, &con->flags);
-	mutex_unlock(&con->sock_mutex);
-	clear_bit(CF_CLOSING, &con->flags);
+	clear_bit(CF_RECV_PENDING, &con->flags);
+	clear_bit(CF_SEND_PENDING, &con->flags);
+	up_write(&con->sock_lock);
 }
 
-static void shutdown_connection(struct connection *con)
+static struct processqueue_entry *new_processqueue_entry(int nodeid,
+							 int buflen)
 {
-	int ret;
-
-	flush_work(&con->swork);
+	struct processqueue_entry *pentry;
 
-	mutex_lock(&con->sock_mutex);
-	/* nothing to shutdown */
-	if (!con->sock) {
-		mutex_unlock(&con->sock_mutex);
-		return;
-	}
+	pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
+	if (!pentry)
+		return NULL;
 
-	set_bit(CF_SHUTDOWN, &con->flags);
-	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
-	mutex_unlock(&con->sock_mutex);
-	if (ret) {
-		log_print("Connection %p failed to shutdown: %d will force close",
-			  con, ret);
-		goto force_close;
-	} else {
-		ret = wait_event_timeout(con->shutdown_wait,
-					 !test_bit(CF_SHUTDOWN, &con->flags),
-					 DLM_SHUTDOWN_WAIT_TIMEOUT);
-		if (ret == 0) {
-			log_print("Connection %p shutdown timed out, will force close",
-				  con);
-			goto force_close;
-		}
+	pentry->buf = kmalloc(buflen, GFP_NOFS);
+	if (!pentry->buf) {
+		kfree(pentry);
+		return NULL;
 	}
 
-	return;
-
-force_close:
-	clear_bit(CF_SHUTDOWN, &con->flags);
-	close_connection(con, false, true, true);
+	pentry->nodeid = nodeid;
+	return pentry;
 }
 
-static void dlm_tcp_shutdown(struct connection *con)
+static void free_processqueue_entry(struct processqueue_entry *pentry)
 {
-	if (con->othercon)
-		shutdown_connection(con->othercon);
-	shutdown_connection(con);
+	kfree(pentry->buf);
+	kfree(pentry);
 }
 
-static int con_realloc_receive_buf(struct connection *con, int newlen)
+static void process_dlm_messages(struct work_struct *work)
 {
-	unsigned char *newbuf;
+	struct processqueue_entry *pentry;
 
-	newbuf = kmalloc(newlen, GFP_NOFS);
-	if (!newbuf)
-		return -ENOMEM;
+	spin_lock(&processqueue_lock);
+	pentry = list_first_entry_or_null(&processqueue,
+					  struct processqueue_entry, list);
+	if (WARN_ON_ONCE(!pentry)) {
+		spin_unlock(&processqueue_lock);
+		return;
+	}
 
-	/* copy any leftover from last receive */
-	if (con->rx_leftover)
-		memmove(newbuf, con->rx_buf, con->rx_leftover);
+	list_del(&pentry->list);
+	spin_unlock(&processqueue_lock);
 
-	/* swap to new buffer space */
-	kfree(con->rx_buf);
-	con->rx_buflen = newlen;
-	con->rx_buf = newbuf;
+	for (;;) {
+		dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
+					    pentry->buflen);
+		free_processqueue_entry(pentry);
+
+		spin_lock(&processqueue_lock);
+		pentry = list_first_entry_or_null(&processqueue,
+						  struct processqueue_entry, list);
+		if (!pentry) {
+			process_dlm_messages_pending = false;
+			spin_unlock(&processqueue_lock);
+			break;
+		}
 
-	return 0;
+		list_del(&pentry->list);
+		spin_unlock(&processqueue_lock);
+	}
 }
 
 /* Data received from remote end */
-static int receive_from_sock(struct connection *con)
+static int receive_from_sock(struct connection *con, int buflen)
 {
+	struct processqueue_entry *pentry;
+	int ret, buflen_real;
 	struct msghdr msg;
 	struct kvec iov;
-	int ret, buflen;
 
-	mutex_lock(&con->sock_mutex);
+	pentry = new_processqueue_entry(con->nodeid, buflen);
+	if (!pentry)
+		return DLM_IO_RESCHED;
 
-	if (con->sock == NULL) {
-		ret = -EAGAIN;
-		goto out_close;
-	}
+	memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
 
-	/* realloc if we get new buffer size to read out */
-	buflen = dlm_config.ci_buffer_size;
-	if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
-		ret = con_realloc_receive_buf(con, buflen);
-		if (ret < 0)
-			goto out_resched;
-	}
+	/* calculate new buffer parameter regarding last receive and
+	 * possible leftover bytes
+	 */
+	iov.iov_base = pentry->buf + con->rx_leftover;
+	iov.iov_len = buflen - con->rx_leftover;
 
-	for (;;) {
-		/* calculate new buffer parameter regarding last receive and
-		 * possible leftover bytes
-		 */
-		iov.iov_base = con->rx_buf + con->rx_leftover;
-		iov.iov_len = con->rx_buflen - con->rx_leftover;
-
-		memset(&msg, 0, sizeof(msg));
-		msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
-		ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
-				     msg.msg_flags);
-		trace_dlm_recv(con->nodeid, ret);
-		if (ret == -EAGAIN)
-			break;
-		else if (ret <= 0)
-			goto out_close;
-
-		/* new buflen according readed bytes and leftover from last receive */
-		buflen = ret + con->rx_leftover;
-		ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
-		if (ret < 0)
-			goto out_close;
-
-		/* calculate leftover bytes from process and put it into begin of
-		 * the receive buffer, so next receive we have the full message
-		 *@the start address of the receive buffer.
-		 */
-		con->rx_leftover = buflen - ret;
-		if (con->rx_leftover) {
-			memmove(con->rx_buf, con->rx_buf + ret,
-				con->rx_leftover);
+	memset(&msg, 0, sizeof(msg));
+	msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+	clear_bit(CF_RECV_INTR, &con->flags);
+again:
+	ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+			     msg.msg_flags);
+	trace_dlm_recv(con->nodeid, ret);
+	if (ret == -EAGAIN) {
+		lock_sock(con->sock->sk);
+		if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
+			release_sock(con->sock->sk);
+			goto again;
 		}
+
+		clear_bit(CF_RECV_PENDING, &con->flags);
+		release_sock(con->sock->sk);
+		free_processqueue_entry(pentry);
+		return DLM_IO_END;
+	} else if (ret == 0) {
+		/* close will clear CF_RECV_PENDING */
+		free_processqueue_entry(pentry);
+		return DLM_IO_EOF;
+	} else if (ret < 0) {
+		free_processqueue_entry(pentry);
+		return ret;
 	}
 
-	dlm_midcomms_receive_done(con->nodeid);
-	mutex_unlock(&con->sock_mutex);
-	return 0;
+	/* new buflen according readed bytes and leftover from last receive */
+	buflen_real = ret + con->rx_leftover;
+	ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
+					   buflen_real);
+	if (ret < 0) {
+		free_processqueue_entry(pentry);
+		return ret;
+	}
 
-out_resched:
-	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
-		queue_work(recv_workqueue, &con->rwork);
-	mutex_unlock(&con->sock_mutex);
-	return -EAGAIN;
-
-out_close:
-	if (ret == 0) {
-		log_print("connection %p got EOF from %d",
-			  con, con->nodeid);
-
-		if (dlm_proto_ops->eof_condition &&
-		    dlm_proto_ops->eof_condition(con)) {
-			set_bit(CF_EOF, &con->flags);
-			mutex_unlock(&con->sock_mutex);
-		} else {
-			mutex_unlock(&con->sock_mutex);
-			close_connection(con, false, true, false);
+	pentry->buflen = ret;
 
-			/* handling for tcp shutdown */
-			clear_bit(CF_SHUTDOWN, &con->flags);
-			wake_up(&con->shutdown_wait);
-		}
+	/* calculate leftover bytes from process and put it into begin of
+	 * the receive buffer, so next receive we have the full message
+	 * at the start address of the receive buffer.
+	 */
+	con->rx_leftover = buflen_real - ret;
+	memmove(con->rx_leftover_buf, pentry->buf + ret,
+		con->rx_leftover);
 
-		/* signal to breaking receive worker */
-		ret = -1;
-	} else {
-		mutex_unlock(&con->sock_mutex);
+	spin_lock(&processqueue_lock);
+	list_add_tail(&pentry->list, &processqueue);
+	if (!process_dlm_messages_pending) {
+		process_dlm_messages_pending = true;
+		queue_work(process_workqueue, &process_work);
 	}
-	return ret;
+	spin_unlock(&processqueue_lock);
+
+	return DLM_IO_SUCCESS;
 }
 
 /* Listening socket is busy, accept a connection */
-static int accept_from_sock(struct listen_connection *con)
+static int accept_from_sock(void)
 {
-	int result;
+	struct connection *newcon, *addcon;
 	struct sockaddr_storage peeraddr;
+	int len, idx, result, nodeid;
 	struct socket *newsock;
-	int len, idx;
-	int nodeid;
-	struct connection *newcon;
-	struct connection *addcon;
 	unsigned int mark;
 
-	if (!con->sock)
-		return -ENOTCONN;
-
-	result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
-	if (result < 0)
+	result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
+	if (result == -EAGAIN)
+		return DLM_IO_END;
+	else if (result < 0)
 		goto accept_err;
 
 	/* Get the connected socket's peer */
@@ -1062,16 +976,16 @@ static int accept_from_sock(struct listen_connection *con)
 	 *  In this case we store the incoming one in "othercon"
 	 */
 	idx = srcu_read_lock(&connections_srcu);
-	newcon = nodeid2con(nodeid, GFP_NOFS);
-	if (!newcon) {
+	newcon = nodeid2con(nodeid, 0);
+	if (WARN_ON_ONCE(!newcon)) {
 		srcu_read_unlock(&connections_srcu, idx);
-		result = -ENOMEM;
+		result = -ENOENT;
 		goto accept_err;
 	}
 
 	sock_set_mark(newsock->sk, mark);
 
-	mutex_lock(&newcon->sock_mutex);
+	down_write(&newcon->sock_lock);
 	if (newcon->sock) {
 		struct connection *othercon = newcon->othercon;
 
@@ -1079,33 +993,24 @@ static int accept_from_sock(struct listen_connection *con)
 			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
 			if (!othercon) {
 				log_print("failed to allocate incoming socket");
-				mutex_unlock(&newcon->sock_mutex);
+				up_write(&newcon->sock_lock);
 				srcu_read_unlock(&connections_srcu, idx);
 				result = -ENOMEM;
 				goto accept_err;
 			}
 
-			result = dlm_con_init(othercon, nodeid);
-			if (result < 0) {
-				kfree(othercon);
-				mutex_unlock(&newcon->sock_mutex);
-				srcu_read_unlock(&connections_srcu, idx);
-				goto accept_err;
-			}
-
-			lockdep_set_subclass(&othercon->sock_mutex, 1);
-			set_bit(CF_IS_OTHERCON, &othercon->flags);
+			dlm_con_init(othercon, nodeid);
+			lockdep_set_subclass(&othercon->sock_lock, 1);
 			newcon->othercon = othercon;
-			othercon->sendcon = newcon;
 		} else {
 			/* close other sock con if we have something new */
-			close_connection(othercon, false, true, false);
+			close_connection(othercon, false);
 		}
 
-		mutex_lock(&othercon->sock_mutex);
+		down_write(&othercon->sock_lock);
 		add_sock(newsock, othercon);
 		addcon = othercon;
-		mutex_unlock(&othercon->sock_mutex);
+		up_write(&othercon->sock_lock);
 	}
 	else {
 		/* accept copies the sk after we've saved the callbacks, so we
@@ -1114,28 +1019,29 @@ static int accept_from_sock(struct listen_connection *con)
 		add_sock(newsock, newcon);
 		addcon = newcon;
 	}
-
-	set_bit(CF_CONNECTED, &addcon->flags);
-	mutex_unlock(&newcon->sock_mutex);
+	up_write(&newcon->sock_lock);
 
 	/*
 	 * Add it to the active queue in case we got data
 	 * between processing the accept adding the socket
 	 * to the read_sockets list
 	 */
-	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
-		queue_work(recv_workqueue, &addcon->rwork);
-
+	lock_sock(addcon->sock->sk);
+	lowcomms_queue_rwork(addcon);
+	release_sock(addcon->sock->sk);
+
+	spin_lock_bh(&addcon->writequeue_lock);
+	if (!list_empty(&addcon->writequeue))
+		lowcomms_queue_swork(addcon);
+	spin_unlock_bh(&addcon->writequeue_lock);
 	srcu_read_unlock(&connections_srcu, idx);
 
-	return 0;
+	return DLM_IO_SUCCESS;
 
 accept_err:
 	if (newsock)
 		sock_release(newsock);
 
-	if (result != -EAGAIN)
-		log_print("error accepting connection from node: %d", result);
 	return result;
 }
 
@@ -1167,7 +1073,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
 	int i, addr_len, result = 0;
 
 	for (i = 0; i < dlm_local_count; i++) {
-		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
+		memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
 		make_sockaddr(&localaddr, port, &addr_len);
 
 		if (!i)
@@ -1187,7 +1093,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
 /* Get local addresses */
 static void init_local(void)
 {
-	struct sockaddr_storage sas, *addr;
+	struct sockaddr_storage sas;
 	int i;
 
 	dlm_local_count = 0;
@@ -1195,21 +1101,10 @@ static void init_local(void)
 		if (dlm_our_addr(&sas, i))
 			break;
 
-		addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
-		if (!addr)
-			break;
-		dlm_local_addr[dlm_local_count++] = addr;
+		memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
 	}
 }
 
-static void deinit_local(void)
-{
-	int i;
-
-	for (i = 0; i < dlm_local_count; i++)
-		kfree(dlm_local_addr[i]);
-}
-
 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
 {
 	struct writequeue_entry *entry;
@@ -1240,7 +1135,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
 {
 	struct writequeue_entry *e;
 
-	spin_lock(&con->writequeue_lock);
+	spin_lock_bh(&con->writequeue_lock);
 	if (!list_empty(&con->writequeue)) {
 		e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
 		if (DLM_WQ_REMAIN_BYTES(e) >= len) {
@@ -1263,14 +1158,13 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
 	kref_get(&e->ref);
 	*ppc = page_address(e->page);
 	e->end += len;
-	atomic_inc(&con->writequeue_cnt);
 	if (cb)
 		cb(data);
 
 	list_add_tail(&e->list, &con->writequeue);
 
 out:
-	spin_unlock(&con->writequeue_lock);
+	spin_unlock_bh(&con->writequeue_lock);
 	return e;
 };
 
@@ -1319,13 +1213,13 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
 	    len < sizeof(struct dlm_header)) {
 		BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
 		log_print("failed to allocate a buffer of size %d", len);
-		WARN_ON(1);
+		WARN_ON_ONCE(1);
 		return NULL;
 	}
 
 	idx = srcu_read_lock(&connections_srcu);
-	con = nodeid2con(nodeid, allocation);
-	if (!con) {
+	con = nodeid2con(nodeid, 0);
+	if (WARN_ON_ONCE(!con)) {
 		srcu_read_unlock(&connections_srcu, idx);
 		return NULL;
 	}
@@ -1350,7 +1244,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 	struct connection *con = e->con;
 	int users;
 
-	spin_lock(&con->writequeue_lock);
+	spin_lock_bh(&con->writequeue_lock);
 	kref_get(&msg->ref);
 	list_add(&msg->list, &e->msgs);
 
@@ -1359,13 +1253,11 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 		goto out;
 
 	e->len = DLM_WQ_LENGTH_BYTES(e);
-	spin_unlock(&con->writequeue_lock);
 
-	queue_work(send_workqueue, &con->swork);
-	return;
+	lowcomms_queue_swork(con);
 
 out:
-	spin_unlock(&con->writequeue_lock);
+	spin_unlock_bh(&con->writequeue_lock);
 	return;
 }
 
@@ -1413,95 +1305,79 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
 }
 
 /* Send a message */
-static void send_to_sock(struct connection *con)
+static int send_to_sock(struct connection *con)
 {
 	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
 	struct writequeue_entry *e;
 	int len, offset, ret;
-	int count;
-
-again:
-	count = 0;
-
-	mutex_lock(&con->sock_mutex);
-	if (con->sock == NULL)
-		goto out_connect;
-
-	spin_lock(&con->writequeue_lock);
-	for (;;) {
-		e = con_next_wq(con);
-		if (!e)
-			break;
 
-		len = e->len;
-		offset = e->offset;
-		BUG_ON(len == 0 && e->users == 0);
-		spin_unlock(&con->writequeue_lock);
-
-		ret = kernel_sendpage(con->sock, e->page, offset, len,
-				      msg_flags);
-		trace_dlm_send(con->nodeid, ret);
-		if (ret == -EAGAIN || ret == 0) {
-			if (ret == -EAGAIN &&
-			    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
-			    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
-				/* Notify TCP that we're limited by the
-				 * application window size.
-				 */
-				set_bit(SOCK_NOSPACE, &con->sock->flags);
-				con->sock->sk->sk_write_pending++;
-			}
-			cond_resched();
-			goto out;
-		} else if (ret < 0)
-			goto out;
+	spin_lock_bh(&con->writequeue_lock);
+	e = con_next_wq(con);
+	if (!e) {
+		clear_bit(CF_SEND_PENDING, &con->flags);
+		spin_unlock_bh(&con->writequeue_lock);
+		return DLM_IO_END;
+	}
 
-		spin_lock(&con->writequeue_lock);
-		writequeue_entry_complete(e, ret);
+	len = e->len;
+	offset = e->offset;
+	WARN_ON_ONCE(len == 0 && e->users == 0);
+	spin_unlock_bh(&con->writequeue_lock);
 
-		/* Don't starve people filling buffers */
-		if (++count >= MAX_SEND_MSG_COUNT) {
-			spin_unlock(&con->writequeue_lock);
-			mutex_unlock(&con->sock_mutex);
-			cond_resched();
-			goto again;
+	ret = kernel_sendpage(con->sock, e->page, offset, len,
+			      msg_flags);
+	trace_dlm_send(con->nodeid, ret);
+	if (ret == -EAGAIN || ret == 0) {
+		lock_sock(con->sock->sk);
+		spin_lock_bh(&con->writequeue_lock);
+		if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
+		    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+			/* Notify TCP that we're limited by the
+			 * application window size.
+			 */
+			set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
+			con->sock->sk->sk_write_pending++;
+
+			clear_bit(CF_SEND_PENDING, &con->flags);
+			spin_unlock_bh(&con->writequeue_lock);
+			release_sock(con->sock->sk);
+
+			/* wait for write_space() event */
+			return DLM_IO_END;
 		}
-	}
-	spin_unlock(&con->writequeue_lock);
-
-	/* close if we got EOF */
-	if (test_and_clear_bit(CF_EOF, &con->flags)) {
-		mutex_unlock(&con->sock_mutex);
-		close_connection(con, false, false, true);
+		spin_unlock_bh(&con->writequeue_lock);
+		release_sock(con->sock->sk);
 
-		/* handling for tcp shutdown */
-		clear_bit(CF_SHUTDOWN, &con->flags);
-		wake_up(&con->shutdown_wait);
-	} else {
-		mutex_unlock(&con->sock_mutex);
+		return DLM_IO_RESCHED;
+	} else if (ret < 0) {
+		return ret;
 	}
 
-	return;
-
-out:
-	mutex_unlock(&con->sock_mutex);
-	return;
+	spin_lock_bh(&con->writequeue_lock);
+	writequeue_entry_complete(e, ret);
+	spin_unlock_bh(&con->writequeue_lock);
 
-out_connect:
-	mutex_unlock(&con->sock_mutex);
-	queue_work(send_workqueue, &con->swork);
-	cond_resched();
+	return DLM_IO_SUCCESS;
 }
 
 static void clean_one_writequeue(struct connection *con)
 {
 	struct writequeue_entry *e, *safe;
 
-	spin_lock(&con->writequeue_lock);
+	spin_lock_bh(&con->writequeue_lock);
 	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
 		free_entry(e);
 	}
-	spin_unlock(&con->writequeue_lock);
+	spin_unlock_bh(&con->writequeue_lock);
+}
+
+static void connection_release(struct rcu_head *rcu)
+{
+	struct connection *con = container_of(rcu, struct connection, rcu);
+
+	WARN_ON_ONCE(!list_empty(&con->writequeue));
+	WARN_ON_ONCE(con->sock);
+	kfree(con);
 }
 
 /* Called from recovery when it knows that a node has
@@ -1509,290 +1385,285 @@ static void clean_one_writequeue(struct connection *con)
 int dlm_lowcomms_close(int nodeid)
 {
 	struct connection *con;
-	struct dlm_node_addr *na;
 	int idx;
 
 	log_print("closing connection to node %d", nodeid);
+
 	idx = srcu_read_lock(&connections_srcu);
 	con = nodeid2con(nodeid, 0);
-	if (con) {
-		set_bit(CF_CLOSE, &con->flags);
-		close_connection(con, true, true, true);
-		clean_one_writequeue(con);
-		if (con->othercon)
-			clean_one_writequeue(con->othercon);
+	if (WARN_ON_ONCE(!con)) {
+		srcu_read_unlock(&connections_srcu, idx);
+		return -ENOENT;
 	}
-	srcu_read_unlock(&connections_srcu, idx);
 
-	spin_lock(&dlm_node_addrs_spin);
-	na = find_node_addr(nodeid);
-	if (na) {
-		list_del(&na->list);
-		while (na->addr_count--)
-			kfree(na->addr[na->addr_count]);
-		kfree(na);
+	spin_lock(&connections_lock);
+	hlist_del_rcu(&con->list);
+	spin_unlock(&connections_lock);
+
+	stop_connection_io(con);
+	close_connection(con, true);
+
+	clean_one_writequeue(con);
+	call_srcu(&connections_srcu, &con->rcu, connection_release);
+	if (con->othercon) {
+		clean_one_writequeue(con->othercon);
+		if (con->othercon)
+			call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
 	}
-	spin_unlock(&dlm_node_addrs_spin);
+	srcu_read_unlock(&connections_srcu, idx);
 
 	return 0;
 }
 
-/* Receive workqueue function */
+/* Receive worker function */
 static void process_recv_sockets(struct work_struct *work)
 {
 	struct connection *con = container_of(work, struct connection, rwork);
+	int ret, buflen;
 
-	clear_bit(CF_READ_PENDING, &con->flags);
-	receive_from_sock(con);
+	down_read(&con->sock_lock);
+	if (!con->sock) {
+		up_read(&con->sock_lock);
+		return;
+	}
+
+	buflen = READ_ONCE(dlm_config.ci_buffer_size);
+	do {
+		ret = receive_from_sock(con, buflen);
+	} while (ret == DLM_IO_SUCCESS);
+	up_read(&con->sock_lock);
+
+	switch (ret) {
+	case DLM_IO_END:
+		dlm_midcomms_receive_done(con->nodeid);
+		break;
+	case DLM_IO_EOF:
+		close_connection(con, false);
+		break;
+	case DLM_IO_RESCHED:
+		cond_resched();
+		queue_work(io_workqueue, &con->rwork);
+		break;
+	default:
+		if (ret < 0) {
+			log_print("node %d recv error %d", con->nodeid, ret);
+			close_connection(con, false);
+			dlm_midcomms_unack_msg_resend(con->nodeid);
+			break;
+		}
+
+		WARN_ON_ONCE(1);
+		break;
+	}
 }
 
 static void process_listen_recv_socket(struct work_struct *work)
 {
 	int ret;
 
+	if (WARN_ON_ONCE(!listen_con.sock))
+		return;
+
 	do {
-		ret = accept_from_sock(&listen_con);
-	} while (!ret);
+		ret = accept_from_sock();
+	} while (ret == DLM_IO_SUCCESS);
+
+	if (ret < 0)
+		log_print("critical error accepting connection: %d", ret);
 }
 
-static void dlm_connect(struct connection *con)
+static int dlm_connect(struct connection *con)
 {
 	struct sockaddr_storage addr;
 	int result, addr_len;
 	struct socket *sock;
 	unsigned int mark;
 
-	/* Some odd races can cause double-connects, ignore them */
-	if (con->retries++ > MAX_CONNECT_RETRIES)
-		return;
-
-	if (con->sock) {
-		log_print("node %d already connected.", con->nodeid);
-		return;
-	}
-
 	memset(&addr, 0, sizeof(addr));
 	result = nodeid_to_addr(con->nodeid, &addr, NULL,
 				dlm_proto_ops->try_new_addr, &mark);
 	if (result < 0) {
 		log_print("no address for nodeid %d", con->nodeid);
-		return;
+		return result;
 	}
 
 	/* Create a socket to communicate with */
-	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
 				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
 	if (result < 0)
-		goto socket_err;
+		return result;
 
 	sock_set_mark(sock->sk, mark);
 	dlm_proto_ops->sockopts(sock);
 
-	add_sock(sock, con);
-
 	result = dlm_proto_ops->bind(sock);
-	if (result < 0)
-		goto add_sock_err;
+	if (result < 0) {
+		sock_release(sock);
+		return result;
+	}
+
+	add_sock(sock, con);
 
 	log_print_ratelimited("connecting to %d", con->nodeid);
 	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
 	result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
 					addr_len);
-	if (result < 0)
-		goto add_sock_err;
-
-	return;
-
-add_sock_err:
-	dlm_close_sock(&con->sock);
+	switch (result) {
+	case -EINPROGRESS:
+		/* not an error */
+		fallthrough;
+	case 0:
+		break;
+	default:
+		if (result < 0)
+			dlm_close_sock(&con->sock);
 
-socket_err:
-	/*
-	 * Some errors are fatal and this list might need adjusting. For other
-	 * errors we try again until the max number of retries is reached.
-	 */
-	if (result != -EHOSTUNREACH &&
-	    result != -ENETUNREACH &&
-	    result != -ENETDOWN &&
-	    result != -EINVAL &&
-	    result != -EPROTONOSUPPORT) {
-		log_print("connect %d try %d error %d", con->nodeid,
-			  con->retries, result);
-		msleep(1000);
-		lowcomms_connect_sock(con);
+		break;
 	}
+
+	return result;
 }
 
-/* Send workqueue function */
+/* Send worker function */
 static void process_send_sockets(struct work_struct *work)
 {
 	struct connection *con = container_of(work, struct connection, swork);
+	int ret;
 
-	WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
-
-	clear_bit(CF_WRITE_PENDING, &con->flags);
-
-	if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
-		close_connection(con, false, false, true);
-		dlm_midcomms_unack_msg_resend(con->nodeid);
+	down_read(&con->sock_lock);
+	if (!con->sock) {
+		up_read(&con->sock_lock);
+		down_write(&con->sock_lock);
+		if (!con->sock) {
+			ret = dlm_connect(con);
+			switch (ret) {
+			case 0:
+				break;
+			case -EINPROGRESS:
+				up_write(&con->sock_lock);
+				cond_resched();
+				queue_work(io_workqueue, &con->swork);
+				return;
+			default:
+				up_write(&con->sock_lock);
+				log_print("connect to node %d try %d error %d",
+					  con->nodeid, con->retries++, ret);
+				msleep(1000);
+				/* For now we try forever to reconnect. In
+				 * future we should send a event to cluster
+				 * manager to fence itself after certain amount
+				 * of retries.
+				 */
+				queue_work(io_workqueue, &con->swork);
+				return;
+			}
+		}
+		downgrade_write(&con->sock_lock);
 	}
 
-	if (con->sock == NULL) {
-		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
-			msleep(1000);
+	do {
+		ret = send_to_sock(con);
+	} while (ret == DLM_IO_SUCCESS);
+	up_read(&con->sock_lock);
 
-		mutex_lock(&con->sock_mutex);
-		dlm_connect(con);
-		mutex_unlock(&con->sock_mutex);
-	}
+	switch (ret) {
+	case DLM_IO_END:
+		break;
+	case DLM_IO_RESCHED:
+		cond_resched();
+		queue_work(io_workqueue, &con->swork);
+		break;
+	default:
+		if (ret < 0) {
+			log_print("node %d send error %d", con->nodeid, ret);
+			close_connection(con, false);
+			dlm_midcomms_unack_msg_resend(con->nodeid);
+			break;
+		}
 
-	if (!list_empty(&con->writequeue))
-		send_to_sock(con);
+		WARN_ON_ONCE(1);
+		break;
+	}
 }
 
 static void work_stop(void)
 {
-	if (recv_workqueue) {
-		destroy_workqueue(recv_workqueue);
-		recv_workqueue = NULL;
+	if (io_workqueue) {
+		destroy_workqueue(io_workqueue);
+		io_workqueue = NULL;
 	}
 
-	if (send_workqueue) {
-		destroy_workqueue(send_workqueue);
-		send_workqueue = NULL;
+	if (process_workqueue) {
+		destroy_workqueue(process_workqueue);
+		process_workqueue = NULL;
 	}
 }
 
 static int work_start(void)
 {
-	recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
-	if (!recv_workqueue) {
-		log_print("can't start dlm_recv");
+	io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM,
+				       0);
+	if (!io_workqueue) {
+		log_print("can't start dlm_io");
 		return -ENOMEM;
 	}
 
-	send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
-	if (!send_workqueue) {
-		log_print("can't start dlm_send");
-		destroy_workqueue(recv_workqueue);
-		recv_workqueue = NULL;
+	/* ordered dlm message process queue,
+	 * should be converted to a tasklet
+	 */
+	process_workqueue = alloc_ordered_workqueue("dlm_process",
+						    WQ_HIGHPRI | WQ_MEM_RECLAIM);
+	if (!process_workqueue) {
+		log_print("can't start dlm_process");
+		destroy_workqueue(io_workqueue);
+		io_workqueue = NULL;
 		return -ENOMEM;
 	}
 
 	return 0;
 }
 
-static void shutdown_conn(struct connection *con)
-{
-	if (dlm_proto_ops->shutdown_action)
-		dlm_proto_ops->shutdown_action(con);
-}
-
 void dlm_lowcomms_shutdown(void)
 {
-	int idx;
-
-	/* Set all the flags to prevent any
-	 * socket activity.
-	 */
-	dlm_allow_conn = 0;
-
-	if (recv_workqueue)
-		flush_workqueue(recv_workqueue);
-	if (send_workqueue)
-		flush_workqueue(send_workqueue);
+	/* stop lowcomms_listen_data_ready calls */
+	lock_sock(listen_con.sock->sk);
+	listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
+	release_sock(listen_con.sock->sk);
 
+	cancel_work_sync(&listen_con.rwork);
 	dlm_close_sock(&listen_con.sock);
 
-	idx = srcu_read_lock(&connections_srcu);
-	foreach_conn(shutdown_conn);
-	srcu_read_unlock(&connections_srcu, idx);
-}
-
-static void _stop_conn(struct connection *con, bool and_other)
-{
-	mutex_lock(&con->sock_mutex);
-	set_bit(CF_CLOSE, &con->flags);
-	set_bit(CF_READ_PENDING, &con->flags);
-	set_bit(CF_WRITE_PENDING, &con->flags);
-	if (con->sock && con->sock->sk) {
-		lock_sock(con->sock->sk);
-		con->sock->sk->sk_user_data = NULL;
-		release_sock(con->sock->sk);
-	}
-	if (con->othercon && and_other)
-		_stop_conn(con->othercon, false);
-	mutex_unlock(&con->sock_mutex);
+	flush_workqueue(process_workqueue);
 }
 
-static void stop_conn(struct connection *con)
+void dlm_lowcomms_shutdown_node(int nodeid, bool force)
 {
-	_stop_conn(con, true);
-}
+	struct connection *con;
+	int idx;
 
-static void connection_release(struct rcu_head *rcu)
-{
-	struct connection *con = container_of(rcu, struct connection, rcu);
+	idx = srcu_read_lock(&connections_srcu);
+	con = nodeid2con(nodeid, 0);
+	if (WARN_ON_ONCE(!con)) {
+		srcu_read_unlock(&connections_srcu, idx);
+		return;
+	}
 
-	kfree(con->rx_buf);
-	kfree(con);
-}
+	flush_work(&con->swork);
+	stop_connection_io(con);
+	WARN_ON_ONCE(!force && !list_empty(&con->writequeue));
 
-static void free_conn(struct connection *con)
-{
-	close_connection(con, true, true, true);
-	spin_lock(&connections_lock);
-	hlist_del_rcu(&con->list);
-	spin_unlock(&connections_lock);
-	if (con->othercon) {
-		clean_one_writequeue(con->othercon);
-		call_srcu(&connections_srcu, &con->othercon->rcu,
-			  connection_release);
-	}
 	clean_one_writequeue(con);
-	call_srcu(&connections_srcu, &con->rcu, connection_release);
-}
-
-static void work_flush(void)
-{
-	int ok;
-	int i;
-	struct connection *con;
+	if (con->othercon)
+		clean_one_writequeue(con->othercon);
 
-	do {
-		ok = 1;
-		foreach_conn(stop_conn);
-		if (recv_workqueue)
-			flush_workqueue(recv_workqueue);
-		if (send_workqueue)
-			flush_workqueue(send_workqueue);
-		for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
-			hlist_for_each_entry_rcu(con, &connection_hash[i],
-						 list) {
-				ok &= test_bit(CF_READ_PENDING, &con->flags);
-				ok &= test_bit(CF_WRITE_PENDING, &con->flags);
-				if (con->othercon) {
-					ok &= test_bit(CF_READ_PENDING,
-						       &con->othercon->flags);
-					ok &= test_bit(CF_WRITE_PENDING,
-						       &con->othercon->flags);
-				}
-			}
-		}
-	} while (!ok);
+	close_connection(con, true);
+	allow_connection_io(con);
+	srcu_read_unlock(&connections_srcu, idx);
 }
 
 void dlm_lowcomms_stop(void)
 {
-	int idx;
-
-	idx = srcu_read_lock(&connections_srcu);
-	work_flush();
-	foreach_conn(free_conn);
-	srcu_read_unlock(&connections_srcu, idx);
 	work_stop();
-	deinit_local();
-
 	dlm_proto_ops = NULL;
 }
 
@@ -1808,7 +1679,7 @@ static int dlm_listen_for_all(void)
 	if (result < 0)
 		return result;
 
-	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
 				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
 	if (result < 0) {
 		log_print("Can't create comms socket: %d", result);
@@ -1822,10 +1693,18 @@ static int dlm_listen_for_all(void)
 	if (result < 0)
 		goto out;
 
-	save_listen_callbacks(sock);
-	add_listen_sock(sock, &listen_con);
+	lock_sock(sock->sk);
+	listen_sock.sk_data_ready = sock->sk->sk_data_ready;
+	listen_sock.sk_write_space = sock->sk->sk_write_space;
+	listen_sock.sk_error_report = sock->sk->sk_error_report;
+	listen_sock.sk_state_change = sock->sk->sk_state_change;
+
+	listen_con.sock = sock;
+
+	sock->sk->sk_allocation = GFP_NOFS;
+	sock->sk->sk_data_ready = lowcomms_listen_data_ready;
+	release_sock(sock->sk);
 
-	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
 	result = sock->ops->listen(sock, 5);
 	if (result < 0) {
 		dlm_close_sock(&listen_con.sock);
@@ -1847,7 +1726,7 @@ static int dlm_tcp_bind(struct socket *sock)
 	/* Bind to our cluster-known address connecting to avoid
 	 * routing problems.
 	 */
-	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
+	memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
 	make_sockaddr(&src_addr, 0, &addr_len);
 
 	result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
@@ -1863,17 +1742,7 @@ static int dlm_tcp_bind(struct socket *sock)
 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
 			   struct sockaddr *addr, int addr_len)
 {
-	int ret;
-
-	ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
-	switch (ret) {
-	case -EINPROGRESS:
-		fallthrough;
-	case 0:
-		return 0;
-	}
-
-	return ret;
+	return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
 }
 
 static int dlm_tcp_listen_validate(void)
@@ -1904,8 +1773,8 @@ static int dlm_tcp_listen_bind(struct socket *sock)
 	int addr_len;
 
 	/* Bind to our port */
-	make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
-	return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
+	make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
+	return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0],
 			       addr_len);
 }
 
@@ -1918,8 +1787,6 @@ static const struct dlm_proto_ops dlm_tcp_ops = {
 	.listen_validate = dlm_tcp_listen_validate,
 	.listen_sockopts = dlm_tcp_listen_sockopts,
 	.listen_bind = dlm_tcp_listen_bind,
-	.shutdown_action = dlm_tcp_shutdown,
-	.eof_condition = tcp_eof_condition,
 };
 
 static int dlm_sctp_bind(struct socket *sock)
@@ -1940,13 +1807,7 @@ static int dlm_sctp_connect(struct connection *con, struct socket *sock,
 	sock_set_sndtimeo(sock->sk, 5);
 	ret = sock->ops->connect(sock, addr, addr_len, 0);
 	sock_set_sndtimeo(sock->sk, 0);
-	if (ret < 0)
-		return ret;
-
-	if (!test_and_set_bit(CF_CONNECTED, &con->flags))
-		log_print("connected to node %d", con->nodeid);
-
-	return 0;
+	return ret;
 }
 
 static int dlm_sctp_listen_validate(void)
@@ -1986,11 +1847,7 @@ static const struct dlm_proto_ops dlm_sctp_ops = {
 
 int dlm_lowcomms_start(void)
 {
-	int error = -EINVAL;
-	int i;
-
-	for (i = 0; i < CONN_HASH_SIZE; i++)
-		INIT_HLIST_HEAD(&connection_hash[i]);
+	int error;
 
 	init_local();
 	if (!dlm_local_count) {
@@ -1999,13 +1856,9 @@ int dlm_lowcomms_start(void)
 		goto fail;
 	}
 
-	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
-
 	error = work_start();
 	if (error)
-		goto fail_local;
-
-	dlm_allow_conn = 1;
+		goto fail;
 
 	/* Start listening */
 	switch (dlm_config.ci_protocol) {
@@ -2031,24 +1884,38 @@ int dlm_lowcomms_start(void)
 fail_listen:
 	dlm_proto_ops = NULL;
 fail_proto_ops:
-	dlm_allow_conn = 0;
 	work_stop();
-fail_local:
-	deinit_local();
 fail:
 	return error;
 }
 
+void dlm_lowcomms_init(void)
+{
+	int i;
+
+	for (i = 0; i < CONN_HASH_SIZE; i++)
+		INIT_HLIST_HEAD(&connection_hash[i]);
+
+	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
+}
+
 void dlm_lowcomms_exit(void)
 {
-	struct dlm_node_addr *na, *safe;
+	struct connection *con;
+	int i, idx;
 
-	spin_lock(&dlm_node_addrs_spin);
-	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
-		list_del(&na->list);
-		while (na->addr_count--)
-			kfree(na->addr[na->addr_count]);
-		kfree(na);
+	idx = srcu_read_lock(&connections_srcu);
+	for (i = 0; i < CONN_HASH_SIZE; i++) {
+		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
+			spin_lock(&connections_lock);
+			hlist_del_rcu(&con->list);
+			spin_unlock(&connections_lock);
+
+			if (con->othercon)
+				call_srcu(&connections_srcu, &con->othercon->rcu,
+					  connection_release);
+			call_srcu(&connections_srcu, &con->rcu, connection_release);
+		}
 	}
-	spin_unlock(&dlm_node_addrs_spin);
+	srcu_read_unlock(&connections_srcu, idx);
 }
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 29369feea991..3e8dca66183b 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -29,12 +29,14 @@ static inline int nodeid_hash(int nodeid)
 	return nodeid & (CONN_HASH_SIZE-1);
 }
 
-/* switch to check if dlm is running */
-extern int dlm_allow_conn;
+/* check if dlm is running */
+bool dlm_lowcomms_is_running(void);
 
 int dlm_lowcomms_start(void);
 void dlm_lowcomms_shutdown(void);
+void dlm_lowcomms_shutdown_node(int nodeid, bool force);
 void dlm_lowcomms_stop(void);
+void dlm_lowcomms_init(void);
 void dlm_lowcomms_exit(void);
 int dlm_lowcomms_close(int nodeid);
 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index 1c5be4b70ac1..a77338be3237 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -17,7 +17,7 @@
 #include "user.h"
 #include "memory.h"
 #include "config.h"
-#include "lowcomms.h"
+#include "midcomms.h"
 
 #define CREATE_TRACE_POINTS
 #include <trace/events/dlm.h>
@@ -30,6 +30,8 @@ static int __init init_dlm(void)
 	if (error)
 		goto out;
 
+	dlm_midcomms_init();
+
 	error = dlm_lockspace_init();
 	if (error)
 		goto out_mem;
@@ -66,6 +68,7 @@ static int __init init_dlm(void)
  out_lockspace:
 	dlm_lockspace_exit();
  out_mem:
+	dlm_midcomms_exit();
 	dlm_memory_exit();
  out:
 	return error;
@@ -79,7 +82,7 @@ static void __exit exit_dlm(void)
 	dlm_config_exit();
 	dlm_memory_exit();
 	dlm_lockspace_exit();
-	dlm_lowcomms_exit();
+	dlm_midcomms_exit();
 	dlm_unregister_debugfs();
 }
 
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 32194a750fe1..32a5d6478276 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -306,11 +306,11 @@ static void dlm_send_queue_flush(struct midcomms_node *node)
 	pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
 
 	rcu_read_lock();
-	spin_lock(&node->send_queue_lock);
+	spin_lock_bh(&node->send_queue_lock);
 	list_for_each_entry_rcu(mh, &node->send_queue, list) {
 		dlm_mhandle_delete(node, mh);
 	}
-	spin_unlock(&node->send_queue_lock);
+	spin_unlock_bh(&node->send_queue_lock);
 	rcu_read_unlock();
 }
 
@@ -437,7 +437,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
 		}
 	}
 
-	spin_lock(&node->send_queue_lock);
+	spin_lock_bh(&node->send_queue_lock);
 	list_for_each_entry_rcu(mh, &node->send_queue, list) {
 		if (before(mh->seq, seq)) {
 			dlm_mhandle_delete(node, mh);
@@ -446,7 +446,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
 			break;
 		}
 	}
-	spin_unlock(&node->send_queue_lock);
+	spin_unlock_bh(&node->send_queue_lock);
 	rcu_read_unlock();
 }
 
@@ -890,12 +890,7 @@ static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
 	dlm_receive_buffer(p, nodeid);
 }
 
-/*
- * Called from the low-level comms layer to process a buffer of
- * commands.
- */
-
-int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len)
 {
 	const unsigned char *ptr = buf;
 	const struct dlm_header *hd;
@@ -930,6 +925,32 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 		if (msglen > len)
 			break;
 
+		ret += msglen;
+		len -= msglen;
+		ptr += msglen;
+	}
+
+	return ret;
+}
+/*
+ * Called from the low-level comms layer to process a buffer of
+ * commands.
+ */
+
+int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+{
+	const unsigned char *ptr = buf;
+	const struct dlm_header *hd;
+	uint16_t msglen;
+	int ret = 0;
+
+	while (len >= sizeof(struct dlm_header)) {
+		hd = (struct dlm_header *)ptr;
+
+		msglen = le16_to_cpu(hd->h_length);
+		if (msglen > len)
+			break;
+
 		switch (hd->h_version) {
 		case cpu_to_le32(DLM_VERSION_3_1):
 			dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
@@ -1046,9 +1067,9 @@ static void midcomms_new_msg_cb(void *data)
 
 	atomic_inc(&mh->node->send_queue_cnt);
 
-	spin_lock(&mh->node->send_queue_lock);
+	spin_lock_bh(&mh->node->send_queue_lock);
 	list_add_tail_rcu(&mh->list, &mh->node->send_queue);
-	spin_unlock(&mh->node->send_queue_lock);
+	spin_unlock_bh(&mh->node->send_queue_lock);
 
 	mh->seq = mh->node->seq_send++;
 }
@@ -1203,13 +1224,28 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh,
 #endif
 
 int dlm_midcomms_start(void)
+{
+	return dlm_lowcomms_start();
+}
+
+void dlm_midcomms_stop(void)
+{
+	dlm_lowcomms_stop();
+}
+
+void dlm_midcomms_init(void)
 {
 	int i;
 
 	for (i = 0; i < CONN_HASH_SIZE; i++)
 		INIT_HLIST_HEAD(&node_hash[i]);
 
-	return dlm_lowcomms_start();
+	dlm_lowcomms_init();
+}
+
+void dlm_midcomms_exit(void)
+{
+	dlm_lowcomms_exit();
 }
 
 static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
@@ -1409,11 +1445,13 @@ static void midcomms_shutdown(struct midcomms_node *node)
 		pr_debug("active shutdown timed out for node %d with state %s\n",
 			 node->nodeid, dlm_state_str(node->state));
 		midcomms_node_reset(node);
+		dlm_lowcomms_shutdown_node(node->nodeid, true);
 		return;
 	}
 
 	pr_debug("active shutdown done for node %d with state %s\n",
 		 node->nodeid, dlm_state_str(node->state));
+	dlm_lowcomms_shutdown_node(node->nodeid, false);
 }
 
 void dlm_midcomms_shutdown(void)
@@ -1421,6 +1459,8 @@ void dlm_midcomms_shutdown(void)
 	struct midcomms_node *node;
 	int i, idx;
 
+	dlm_lowcomms_shutdown();
+
 	mutex_lock(&close_lock);
 	idx = srcu_read_lock(&nodes_srcu);
 	for (i = 0; i < CONN_HASH_SIZE; i++) {
@@ -1438,8 +1478,6 @@ void dlm_midcomms_shutdown(void)
 	}
 	srcu_read_unlock(&nodes_srcu, idx);
 	mutex_unlock(&close_lock);
-
-	dlm_lowcomms_shutdown();
 }
 
 int dlm_midcomms_close(int nodeid)
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index d6286e80b077..bea1cee4279c 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -14,6 +14,7 @@
 
 struct midcomms_node;
 
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
 int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 					     gfp_t allocation, char **ppc);
@@ -21,6 +22,9 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
 				 int namelen);
 int dlm_midcomms_close(int nodeid);
 int dlm_midcomms_start(void);
+void dlm_midcomms_stop(void);
+void dlm_midcomms_init(void);
+void dlm_midcomms_exit(void);
 void dlm_midcomms_shutdown(void);
 void dlm_midcomms_add_member(int nodeid);
 void dlm_midcomms_remove_member(int nodeid);
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 11/18] fs: dlm: use a non-static queue for callbacks
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 11/18] fs: dlm: use a non-static queue for callbacks Alexander Aring
@ 2022-10-28 21:17   ` Alexander Aring
  0 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-28 21:17 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi,

On Thu, Oct 27, 2022 at 4:45 PM Alexander Aring <aahringo@redhat.com> wrote:
...
> +
> +       /* TODO */
> +       lkb->lkb_flags &= ~DLM_IFL_NEED_SCHED;

sorry, I think I put a TODO there because I wanted to rename this flag
with DLM_IFL_CB_PENDING.

- Alex


^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Cluster-devel] [PATCH v6.1-rc1 18/18] fs: dlm: rework lowcomms handling
  2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 18/18] fs: dlm: rework lowcomms handling Alexander Aring
@ 2022-10-28 21:25   ` Alexander Aring
  0 siblings, 0 replies; 20+ messages in thread
From: Alexander Aring @ 2022-10-28 21:25 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi,

On Thu, Oct 27, 2022 at 4:45 PM Alexander Aring <aahringo@redhat.com> wrote:
>
> This patch is rework of lowcomms handling, the main goal was here to
> handle recvmsg() and sendpage() to run parallel. Parallel in two sense:
> 1. per connection and 2. that recvmsg()/sendpage() doesn't block each
> other.
>
> Currently recvmsg()/sendpage() cannot run parallel because two
> workqueues "dlm_recv" and "dlm_send" are ordered workqeues. That's said
> and we having two workqueues for receive and send, they block each if
> both workqueues schedule the same connection at the same time because a
> per connection mutex to prevent that the socket gets released by being
> used.
>
> To make it more parallel we introduce one "dlm_io" workqueue which is
> not an ordered workqueue, the amount of workers are not limited. Due
> per connection flags SEND/RECV pending we schedule workers ordered per
> connection and per send and receive task. To get rid of the mutex
> blocking same workers to do socket handling we switched to a semaphore
> which handles socket operations as read lock and sock releases as write
> operations, to prevent sock_release() being called while the socket is
> being used.
>
> There might be more optimization removing the semaphore and replacing it
> with rcu, however due other circumstances e.g. othercon behaviour it
> seems complicated to doing this change. I added comments to remove the
> othercon handling and moving to rcu handling. We need to do that to the
> next dlm major version upgrade because it is not backwards compatible
> with the current connect mechanism.
>
> The processing of dlm messages need to be still handled by a ordered
> workqueue. An dlm_process ordered workqeue was introduced which gets
> filled by the receive worker. This is probably the next bottleneck of
> DLM but the application can't currently parse dlm messages parallel. A
> comment was introduced to lift the workqueue context of dlm processing
> in a non-sleepable softirq.
>
> The lifetime of a connection struct is now different because the
> dlm_node_addr structure was merged into the connection struct. The
> shutdown handling for TCP was removed to handle half-open sockets. We
> have a application handling for dealing with half-opened connection
> which is required by dlm and cluster manager join/leave events. SCTP
> doesn't support half-closed sockets and it's already handled by the
> application to never run into such case, therefore we removed it.
>
> To check if lowcomms is currently running and it cannot be configured at
> this time we introduced a function dlm_lowcomms_is_running() and remove
> the dlm_allow_conn variable. The synchronization of stopping all listen
> socket workers goes now of it's data ready socket callback.
>
> Signed-off-by: Alexander Aring <aahringo@redhat.com>
> ---
>  fs/dlm/config.c    |    4 +-
>  fs/dlm/lockspace.c |    5 +-
>  fs/dlm/lowcomms.c  | 1453 ++++++++++++++++++++------------------------
>  fs/dlm/lowcomms.h  |    6 +-
>  fs/dlm/main.c      |    7 +-
>  fs/dlm/midcomms.c  |   68 ++-
>  fs/dlm/midcomms.h  |    4 +
>  7 files changed, 730 insertions(+), 817 deletions(-)
>
> diff --git a/fs/dlm/config.c b/fs/dlm/config.c
> index ac8b62106ce0..20b60709eccf 100644
> --- a/fs/dlm/config.c
> +++ b/fs/dlm/config.c
> @@ -183,7 +183,7 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x)
>                 return -EINVAL;
>         }
>
> -       if (dlm_allow_conn)
...
> +
> +       switch (ret) {
> +       case DLM_IO_END:
> +               dlm_midcomms_receive_done(con->nodeid);
> +               break;
> +       case DLM_IO_EOF:
> +               close_connection(con, false);
> +               break;
> +       case DLM_IO_RESCHED:
> +               cond_resched();
> +               queue_work(io_workqueue, &con->rwork);
> +               break;
> +       default:
> +               if (ret < 0) {
> +                       log_print("node %d recv error %d", con->nodeid, ret);
> +                       close_connection(con, false);
> +                       dlm_midcomms_unack_msg_resend(con->nodeid);

I will change it to only call close_connection() on the send side and
put only a dlm_midcomms_unack_msg_resend() in here. It is difficult to
synchronize it with the sending worker. As we retransmit not acked dlm
messages we will hopefully get an error on the sending try and it will
close the socket.

Because the othercon and if we have the othercon it's only receiving
we will call close_connection() if it's a "othercon".

I currently try to run a test with tcpkill to see how the
close/reconnect behaviour acts.
I know the patch is really really big. I will try to split them
somehow into smaller ones as possible.

- Alex


^ permalink raw reply	[flat|nested] 20+ messages in thread

end of thread, other threads:[~2022-10-28 21:25 UTC | newest]

Thread overview: 20+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-10-27 20:45 [Cluster-devel] [PATCH v6.1-rc1 01/18] fs: dlm: fix sock release if listen fails Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 02/18] fs: dlm: retry accept() until -EAGAIN or error returns Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 03/18] fs: dlm: remove send repeat remove handling Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 04/18] fs: dlm: use packet in dlm_mhandle Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 05/18] fd: dlm: trace send/recv of dlm message and rcom Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 06/18] fs: dlm: let dlm_add_cb queue work after resume only Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 07/18] fs: dlm: use list_first_entry marco Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 08/18] fs: dlm: convert ls_cb_mutex mutex to spinlock Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 09/18] fs: dlm: use spin lock instead of mutex Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 10/18] fs: dlm: move last cast bast time to function call Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 11/18] fs: dlm: use a non-static queue for callbacks Alexander Aring
2022-10-28 21:17   ` Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 12/18] fs: dlm: allow different allocation context per _create_message Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 13/18] fs: dlm: remove ls_remove_wait waitqueue Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 14/18] fs: dlm: relax sending to allow receiving Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 15/18] fs: dlm: catch dlm_add_member() error Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 16/18] fs: dlm: fix log of lowcomms vs midcomms Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 17/18] fs: dlm: use WARN_ON_ONCE() instead of WARN_ON() Alexander Aring
2022-10-27 20:45 ` [Cluster-devel] [PATCH v6.1-rc1 18/18] fs: dlm: rework lowcomms handling Alexander Aring
2022-10-28 21:25   ` Alexander Aring

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).