public inbox for gfs2@lists.linux.dev
 help / color / mirror / Atom feed
* [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context
@ 2024-04-02 19:17 Alexander Aring
  2024-04-02 19:17 ` [PATCHv4 dlm/next 01/15] dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create Alexander Aring
                   ` (14 more replies)
  0 siblings, 15 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:17 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

Hi,

this is version 4 of my attempt to bring dlm message parsing to softirq
context. This will "minimal" improve the dlm message parsing that is
still message after message parsing. There is still a context switch
involed that is the dlm callback workqueue. This patch mostly have the
effect that we call "queue_work()" more often. In future we will get
rid of the dlm callback workqueue and call the DLM user callback
directly in the dlm message processing softirq context. However this
requires that DLM users adapt the changes and signal this capability
over a new lockspace flag.

- Alex

changes since v4:
 - add still "dlm: Simplify the allocation of slab caches in
   dlm_lowcomms_msg_cache_create" that was forgotten in the last
   patch series.
 - update commit message to hopefully more clear
 - use new WQ_BH instead of tasklets
 - use trylock and return -EAGAIN if debugfs cannot access waiters
   due recovery is going on
 - move clear of root_list in its own goto label instead doing it
   when its already cleared
 - introduce dlm_refcount_dec_and_lock_bh() that hopefully can
   be later moved into refcount code instead of
   local_bh_disable/enable all the time

changes since v3:
 - add patches that got meanwhile on the mailinglist to resend them.
 - add "remove schedule in dlm receive path" to remove a schedule()
   in the dlm msg processing path.
 - change commit message of "dlm: do dlm message processing in softirq
   context" to mention the

changes since v2:

I changed in v2 that we split the root_list to root_list and a per
lockspace masters list. The root_list can be used as a stack list
variable in ls_recover() as it's only used for recovery handling. The
masters list is somehow special because it is being used for other dlm
nodes to dump the nodes master rsbs. The current implementation
guarantees that this happens in a very special part of the recovery
handling by using a kind of distributed cluster barriers. I added more
sanity checks for this handling and a more per node based recovery log
mechanism. There is also a TODO that describes we should keep track of
all masters rsb while lockspace locking handling and not create them
while recovery handling which I think should improve the handling to
maybe get rid of those barriers.

Alexander Aring (14):
  dlm: remove allocation parameter in msg allocation
  dlm: switch to GFP_ATOMIC in dlm allocations
  dlm: move root_list functionality to recover.c
  dlm: move master dir dump to own list
  dlm: move root_list to ls_recover() stack
  dlm: implement directory dump context
  dlm: drop holding waiters mutex in waiters recovery
  dlm: convert ls_waiters_mutex to spinlock
  dlm: convert res_lock to spinlock
  dlm: make requestqueue handling non sleepable
  dlm: ls_recv_active semaphore to rwlock
  dlm: remove schedule in dlm receive path
  dlm: convert message parsing locks to disable bh
  dlm: do dlm message processing in softirq context

Kunwu Chan (1):
  dlm: Simplify the allocation of slab caches in
    dlm_lowcomms_msg_cache_create

 fs/dlm/ast.c          |  12 +-
 fs/dlm/debug_fs.c     |  41 ++++---
 fs/dlm/dir.c          | 147 +++++++++++++++++++----
 fs/dlm/dir.h          |   3 +-
 fs/dlm/dlm_internal.h |  20 ++--
 fs/dlm/lock.c         | 271 +++++++++++++++++++++++-------------------
 fs/dlm/lock.h         |   7 +-
 fs/dlm/lockspace.c    |  65 +++++-----
 fs/dlm/lowcomms.c     |  62 ++++++----
 fs/dlm/lowcomms.h     |   5 +-
 fs/dlm/member.c       |  23 ++--
 fs/dlm/memory.c       |  14 +--
 fs/dlm/memory.h       |   4 +-
 fs/dlm/midcomms.c     |  64 +++++-----
 fs/dlm/midcomms.h     |   3 +-
 fs/dlm/rcom.c         |  33 +++--
 fs/dlm/recover.c      | 126 ++++++--------------
 fs/dlm/recover.h      |  10 +-
 fs/dlm/recoverd.c     | 142 ++++++++++++++++++----
 fs/dlm/requestqueue.c |  43 ++-----
 fs/dlm/user.c         |  32 ++---
 21 files changed, 647 insertions(+), 480 deletions(-)

-- 
2.43.0


^ permalink raw reply	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 01/15] dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
@ 2024-04-02 19:17 ` Alexander Aring
  2024-04-02 19:17 ` [PATCHv4 dlm/next 02/15] dlm: remove allocation parameter in msg allocation Alexander Aring
                   ` (13 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:17 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

From: Kunwu Chan <chentao@kylinos.cn>

Use the new KMEM_CACHE() macro instead of direct kmem_cache_create
to simplify the creation of SLAB caches.

Signed-off-by: Kunwu Chan <chentao@kylinos.cn>
Acked-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 6296c62c10fa..712165a1e567 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -248,7 +248,7 @@ struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
 
 struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
 {
-	return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
+	return KMEM_CACHE(dlm_msg, 0);
 }
 
 /* need to held writequeue_lock */
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 02/15] dlm: remove allocation parameter in msg allocation
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
  2024-04-02 19:17 ` [PATCHv4 dlm/next 01/15] dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create Alexander Aring
@ 2024-04-02 19:17 ` Alexander Aring
  2024-04-02 19:17 ` [PATCHv4 dlm/next 03/15] dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
                   ` (12 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:17 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch removes the context parameter for message allocations and
replace it by always do GFP_ATOMIC. We are preparing to process dlm
message in softirq context therefore it's necessary to switch to
GFP_ATOMIC allocation as we cannot sleep in this context. To simplify
the code overall we just drop the allocation flag and have GFP_ATOMIC
hardcoded when calling the allocation function.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c     | 31 ++++++++++++-------------------
 fs/dlm/lowcomms.c | 16 +++++++---------
 fs/dlm/lowcomms.h |  5 ++---
 fs/dlm/memory.c   |  8 ++++----
 fs/dlm/memory.h   |  4 ++--
 fs/dlm/midcomms.c | 24 ++++++++++--------------
 fs/dlm/midcomms.h |  3 +--
 fs/dlm/rcom.c     |  7 +++----
 8 files changed, 41 insertions(+), 57 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c8426f6f518c..2f94ffc3cf82 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3330,8 +3330,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 static int _create_message(struct dlm_ls *ls, int mb_len,
 			   int to_nodeid, int mstype,
 			   struct dlm_message **ms_ret,
-			   struct dlm_mhandle **mh_ret,
-			   gfp_t allocation)
+			   struct dlm_mhandle **mh_ret)
 {
 	struct dlm_message *ms;
 	struct dlm_mhandle *mh;
@@ -3341,7 +3340,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
 	   pass into midcomms_commit and a message buffer (mb) that we
 	   write our data into */
 
-	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
+	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
 	if (!mh)
 		return -ENOBUFS;
 
@@ -3363,8 +3362,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
 			  int to_nodeid, int mstype,
 			  struct dlm_message **ms_ret,
-			  struct dlm_mhandle **mh_ret,
-			  gfp_t allocation)
+			  struct dlm_mhandle **mh_ret)
 {
 	int mb_len = sizeof(struct dlm_message);
 
@@ -3385,7 +3383,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
 	}
 
 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
-			       ms_ret, mh_ret, allocation);
+			       ms_ret, mh_ret);
 }
 
 /* further lowcomms enhancements or alternate implementations may make
@@ -3454,7 +3452,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
 	if (error)
 		return error;
 
-	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 	if (error)
 		goto fail;
 
@@ -3514,8 +3512,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3536,8 +3533,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3562,8 +3558,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
 	if (error)
 		return error;
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
 	if (error)
 		goto fail;
 
@@ -3587,8 +3582,7 @@ static int send_remove(struct dlm_rsb *r)
 
 	to_nodeid = dlm_dir_nodeid(r);
 
-	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
-			       GFP_ATOMIC);
+	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3609,7 +3603,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 
 	to_nodeid = lkb->lkb_nodeid;
 
-	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -3651,8 +3645,7 @@ static int send_lookup_reply(struct dlm_ls *ls,
 	struct dlm_mhandle *mh;
 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
 
-	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
-			       GFP_NOFS);
+	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
 	if (error)
 		goto out;
 
@@ -6103,7 +6096,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
 	int error;
 
 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
-				DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
+				DLM_MSG_PURGE, &ms, &mh);
 	if (error)
 		return error;
 	ms->m_nodeid = cpu_to_le32(nodeid);
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 712165a1e567..ab2cfbd2ea77 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1229,14 +1229,13 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
 };
 
 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
-						gfp_t allocation, char **ppc,
-						void (*cb)(void *data),
+						char **ppc, void (*cb)(void *data),
 						void *data)
 {
 	struct writequeue_entry *e;
 	struct dlm_msg *msg;
 
-	msg = dlm_allocate_msg(allocation);
+	msg = dlm_allocate_msg();
 	if (!msg)
 		return NULL;
 
@@ -1261,9 +1260,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
  * dlm_lowcomms_commit_msg which is a must call if success
  */
 #ifndef __CHECKER__
-struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
-				     char **ppc, void (*cb)(void *data),
-				     void *data)
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
+				     void (*cb)(void *data), void *data)
 {
 	struct connection *con;
 	struct dlm_msg *msg;
@@ -1284,7 +1282,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
 		return NULL;
 	}
 
-	msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
+	msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
 	if (!msg) {
 		srcu_read_unlock(&connections_srcu, idx);
 		return NULL;
@@ -1348,8 +1346,8 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
 	if (msg->retransmit)
 		return 1;
 
-	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
-					      GFP_ATOMIC, &ppc, NULL, NULL);
+	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
+					      NULL, NULL);
 	if (!msg_resend)
 		return -ENOMEM;
 
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 3e8dca66183b..8deb16f8f620 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -39,9 +39,8 @@ void dlm_lowcomms_stop(void);
 void dlm_lowcomms_init(void);
 void dlm_lowcomms_exit(void);
 int dlm_lowcomms_close(int nodeid);
-struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
-				     char **ppc, void (*cb)(void *data),
-				     void *data);
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
+				     void (*cb)(void *data), void *data);
 void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
 void dlm_lowcomms_put_msg(struct dlm_msg *msg);
 int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index be9398ddf357..ab663ca66aca 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -130,9 +130,9 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
 	kmem_cache_free(lkb_cache, lkb);
 }
 
-struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation)
+struct dlm_mhandle *dlm_allocate_mhandle(void)
 {
-	return kmem_cache_alloc(mhandle_cache, allocation);
+	return kmem_cache_alloc(mhandle_cache, GFP_ATOMIC);
 }
 
 void dlm_free_mhandle(struct dlm_mhandle *mhandle)
@@ -150,9 +150,9 @@ void dlm_free_writequeue(struct writequeue_entry *writequeue)
 	kmem_cache_free(writequeue_cache, writequeue);
 }
 
-struct dlm_msg *dlm_allocate_msg(gfp_t allocation)
+struct dlm_msg *dlm_allocate_msg(void)
 {
-	return kmem_cache_alloc(msg_cache, allocation);
+	return kmem_cache_alloc(msg_cache, GFP_ATOMIC);
 }
 
 void dlm_free_msg(struct dlm_msg *msg)
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index 6b29563d24f7..15198d46b42a 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -20,11 +20,11 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
 void dlm_free_lkb(struct dlm_lkb *l);
 char *dlm_allocate_lvb(struct dlm_ls *ls);
 void dlm_free_lvb(char *l);
-struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation);
+struct dlm_mhandle *dlm_allocate_mhandle(void);
 void dlm_free_mhandle(struct dlm_mhandle *mhandle);
 struct writequeue_entry *dlm_allocate_writequeue(void);
 void dlm_free_writequeue(struct writequeue_entry *writequeue);
-struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
+struct dlm_msg *dlm_allocate_msg(void);
 void dlm_free_msg(struct dlm_msg *msg);
 struct dlm_callback *dlm_allocate_cb(void);
 void dlm_free_cb(struct dlm_callback *cb);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 8e9920f1b48b..ed6fb9b9a582 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -379,8 +379,7 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
 	struct dlm_msg *msg;
 	char *ppc;
 
-	msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_ATOMIC, &ppc,
-				   NULL, NULL);
+	msg = dlm_lowcomms_new_msg(nodeid, mb_len, &ppc, NULL, NULL);
 	if (!msg)
 		return -ENOMEM;
 
@@ -428,7 +427,7 @@ static int dlm_send_fin(struct midcomms_node *node,
 	struct dlm_mhandle *mh;
 	char *ppc;
 
-	mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc);
+	mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, &ppc);
 	if (!mh)
 		return -ENOMEM;
 
@@ -976,13 +975,13 @@ static void midcomms_new_msg_cb(void *data)
 }
 
 static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
-						int len, gfp_t allocation, char **ppc)
+						int len, char **ppc)
 {
 	struct dlm_opts *opts;
 	struct dlm_msg *msg;
 
 	msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
-				   allocation, ppc, midcomms_new_msg_cb, mh);
+				   ppc, midcomms_new_msg_cb, mh);
 	if (!msg)
 		return NULL;
 
@@ -1001,8 +1000,7 @@ static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int node
  * dlm_midcomms_commit_mhandle which is a must call if success
  */
 #ifndef __CHECKER__
-struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
-					     gfp_t allocation, char **ppc)
+struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc)
 {
 	struct midcomms_node *node;
 	struct dlm_mhandle *mh;
@@ -1017,7 +1015,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 	/* this is a bug, however we going on and hope it will be resolved */
 	WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
 
-	mh = dlm_allocate_mhandle(allocation);
+	mh = dlm_allocate_mhandle();
 	if (!mh)
 		goto err;
 
@@ -1028,8 +1026,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 
 	switch (node->version) {
 	case DLM_VERSION_3_1:
-		msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
-					   NULL, NULL);
+		msg = dlm_lowcomms_new_msg(nodeid, len, ppc, NULL, NULL);
 		if (!msg) {
 			dlm_free_mhandle(mh);
 			goto err;
@@ -1040,8 +1037,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
 		/* send ack back if necessary */
 		dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
 
-		msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
-					       ppc);
+		msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, ppc);
 		if (!msg) {
 			dlm_free_mhandle(mh);
 			goto err;
@@ -1501,8 +1497,8 @@ int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
 	rd.node = node;
 	rd.buf = buf;
 
-	msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
-				   &msgbuf, midcomms_new_rawmsg_cb, &rd);
+	msg = dlm_lowcomms_new_msg(node->nodeid, buflen, &msgbuf,
+				   midcomms_new_rawmsg_cb, &rd);
 	if (!msg)
 		return -ENOMEM;
 
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index e7246fb3ef57..278d26fdeb2c 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -16,8 +16,7 @@ struct midcomms_node;
 
 int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
 int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
-struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
-					     gfp_t allocation, char **ppc);
+struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc);
 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
 				 int namelen);
 int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 3b734aed26b5..2e3f529f3ff2 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -55,7 +55,7 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
 	struct dlm_mhandle *mh;
 	char *mb;
 
-	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
+	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
 	if (!mh) {
 		log_print("%s to %d type %d len %d ENOBUFS",
 			  __func__, to_nodeid, type, len);
@@ -75,8 +75,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
 	struct dlm_msg *msg;
 	char *mb;
 
-	msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
-				   NULL, NULL);
+	msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
 	if (!msg) {
 		log_print("create_rcom to %d type %d len %d ENOBUFS",
 			  to_nodeid, type, len);
@@ -510,7 +509,7 @@ int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
 	char *mb;
 	int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 
-	mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
+	mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
 	if (!mh)
 		return -ENOBUFS;
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 03/15] dlm: switch to GFP_ATOMIC in dlm allocations
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
  2024-04-02 19:17 ` [PATCHv4 dlm/next 01/15] dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create Alexander Aring
  2024-04-02 19:17 ` [PATCHv4 dlm/next 02/15] dlm: remove allocation parameter in msg allocation Alexander Aring
@ 2024-04-02 19:17 ` Alexander Aring
  2024-04-02 19:17 ` [PATCHv4 dlm/next 04/15] dlm: move root_list functionality to recover.c Alexander Aring
                   ` (11 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:17 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch prepares to process dlm messages in softirq context. To
switch dlm to parse messages in softirq context some code parts either
runs inside the softirq context or need to switch to run while a spinlock
is held. This patch prepares to switch the allocation context to
GFP_ATOMIC for those places. It's not possible anymore to preload idr
allocations. However this is only a performance speedup and we might
switch to xarray implementation with more lockless readers paradigms.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c         | 2 --
 fs/dlm/memory.c       | 6 +++---
 fs/dlm/recover.c      | 2 --
 fs/dlm/requestqueue.c | 2 +-
 4 files changed, 4 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 2f94ffc3cf82..d87464614bc5 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1206,13 +1206,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 
-	idr_preload(GFP_NOFS);
 	spin_lock(&ls->ls_lkbidr_spin);
 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
 	if (rv >= 0)
 		lkb->lkb_id = rv;
 	spin_unlock(&ls->ls_lkbidr_spin);
-	idr_preload_end();
 
 	if (rv < 0) {
 		log_error(ls, "create_lkb idr error %d", rv);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index ab663ca66aca..15a8b1cee433 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -84,7 +84,7 @@ char *dlm_allocate_lvb(struct dlm_ls *ls)
 {
 	char *p;
 
-	p = kzalloc(ls->ls_lvblen, GFP_NOFS);
+	p = kzalloc(ls->ls_lvblen, GFP_ATOMIC);
 	return p;
 }
 
@@ -97,7 +97,7 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
 
-	r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
+	r = kmem_cache_zalloc(rsb_cache, GFP_ATOMIC);
 	return r;
 }
 
@@ -112,7 +112,7 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
 
-	lkb = kmem_cache_zalloc(lkb_cache, GFP_NOFS);
+	lkb = kmem_cache_zalloc(lkb_cache, GFP_ATOMIC);
 	return lkb;
 }
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 53917c0aa3c0..ce6dc914cb86 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -310,7 +310,6 @@ static int recover_idr_add(struct dlm_rsb *r)
 	struct dlm_ls *ls = r->res_ls;
 	int rv;
 
-	idr_preload(GFP_NOFS);
 	spin_lock(&ls->ls_recover_idr_lock);
 	if (r->res_id) {
 		rv = -1;
@@ -326,7 +325,6 @@ static int recover_idr_add(struct dlm_rsb *r)
 	rv = 0;
 out_unlock:
 	spin_unlock(&ls->ls_recover_idr_lock);
-	idr_preload_end();
 	return rv;
 }
 
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 892d6ca21e74..c05940afd063 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -37,7 +37,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
 	int length = le16_to_cpu(ms->m_header.h_length) -
 		sizeof(struct dlm_message);
 
-	e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
+	e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC);
 	if (!e) {
 		log_print("dlm_add_requestqueue: out of memory len %d", length);
 		return;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 04/15] dlm: move root_list functionality to recover.c
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (2 preceding siblings ...)
  2024-04-02 19:17 ` [PATCHv4 dlm/next 03/15] dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
@ 2024-04-02 19:17 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 05/15] dlm: move master dir dump to own list Alexander Aring
                   ` (10 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:17 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch moves dlm_create_root_list() and dlm_release_root_list() to
recover.c and declare them static because they are only used there.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/recover.c  | 42 ------------------------------------------
 fs/dlm/recover.h  |  2 --
 fs/dlm/recoverd.c | 39 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 39 insertions(+), 44 deletions(-)

diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index ce6dc914cb86..6abc283f8f36 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -889,48 +889,6 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 /* Create a single list of all root rsb's to be used during recovery */
 
-int dlm_create_root_list(struct dlm_ls *ls)
-{
-	struct rb_node *n;
-	struct dlm_rsb *r;
-	int i, error = 0;
-
-	down_write(&ls->ls_root_sem);
-	if (!list_empty(&ls->ls_root_list)) {
-		log_error(ls, "root list not empty");
-		error = -EINVAL;
-		goto out;
-	}
-
-	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock(&ls->ls_rsbtbl[i].lock);
-		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_add(&r->res_root_list, &ls->ls_root_list);
-			dlm_hold_rsb(r);
-		}
-
-		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
-			log_error(ls, "dlm_create_root_list toss not empty");
-		spin_unlock(&ls->ls_rsbtbl[i].lock);
-	}
- out:
-	up_write(&ls->ls_root_sem);
-	return error;
-}
-
-void dlm_release_root_list(struct dlm_ls *ls)
-{
-	struct dlm_rsb *r, *safe;
-
-	down_write(&ls->ls_root_sem);
-	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
-		list_del_init(&r->res_root_list);
-		dlm_put_rsb(r);
-	}
-	up_write(&ls->ls_root_sem);
-}
-
 void dlm_clear_toss(struct dlm_ls *ls)
 {
 	struct rb_node *n, *next;
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index dbc51013ecad..0b54550ee055 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -23,8 +23,6 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
 int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
 void dlm_recovered_lock(struct dlm_rsb *r);
-int dlm_create_root_list(struct dlm_ls *ls);
-void dlm_release_root_list(struct dlm_ls *ls);
 void dlm_clear_toss(struct dlm_ls *ls);
 void dlm_recover_rsbs(struct dlm_ls *ls);
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 4d17491dea2f..8eb42554ccb0 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -20,6 +20,45 @@
 #include "requestqueue.h"
 #include "recoverd.h"
 
+static void dlm_create_root_list(struct dlm_ls *ls)
+{
+	struct rb_node *n;
+	struct dlm_rsb *r;
+	int i;
+
+	down_write(&ls->ls_root_sem);
+	if (!list_empty(&ls->ls_root_list)) {
+		log_error(ls, "root list not empty");
+		goto out;
+	}
+
+	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			list_add(&r->res_root_list, &ls->ls_root_list);
+			dlm_hold_rsb(r);
+		}
+
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
+			log_error(ls, "%s toss not empty", __func__);
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
+	}
+ out:
+	up_write(&ls->ls_root_sem);
+}
+
+static void dlm_release_root_list(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r, *safe;
+
+	down_write(&ls->ls_root_sem);
+	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
+		list_del_init(&r->res_root_list);
+		dlm_put_rsb(r);
+	}
+	up_write(&ls->ls_root_sem);
+}
 
 /* If the start for which we're re-enabling locking (seq) has been superseded
    by a newer stop (ls_recover_seq), we need to leave locking disabled.
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 05/15] dlm: move master dir dump to own list
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (3 preceding siblings ...)
  2024-04-02 19:17 ` [PATCHv4 dlm/next 04/15] dlm: move root_list functionality to recover.c Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 06/15] dlm: move root_list to ls_recover() stack Alexander Aring
                   ` (9 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch moves the master directory dump, means dlm_rsbs where we are
the master of (res_nodeid == 0), to it's own list handling. Currently
the only mutual access to ls->root_list is due the master directory
dump. Put it into it's own list handling allows us to put the root_list
out of the global per lockspace context and make it lockless. While on
it move the rw semaphore to a rwlock as the context allows it.

Add a comment that we should keep track of our own master rsbs while
locking occurs instead of recovery creates it in a snapshot like mode.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dir.c          | 22 ++++++---------
 fs/dlm/dlm_internal.h |  3 ++
 fs/dlm/lock.c         |  2 ++
 fs/dlm/lockspace.c    |  2 ++
 fs/dlm/recoverd.c     | 64 +++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 79 insertions(+), 14 deletions(-)

diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index f6acba4310a7..10753486049a 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -216,16 +216,13 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	if (!rv)
 		return r;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
-			up_read(&ls->ls_root_sem);
 			log_debug(ls, "find_rsb_root revert to root_list %s",
 				  r->res_name);
 			return r;
 		}
 	}
-	up_read(&ls->ls_root_sem);
 	return NULL;
 }
 
@@ -241,7 +238,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	int offset = 0, dir_nodeid;
 	__be16 be_namelen;
 
-	down_read(&ls->ls_root_sem);
+	read_lock(&ls->ls_masters_lock);
 
 	if (inlen > 1) {
 		r = find_rsb_root(ls, inbuf, inlen);
@@ -250,16 +247,13 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 				  nodeid, inlen, inlen, inbuf);
 			goto out;
 		}
-		list = r->res_root_list.next;
+		list = r->res_masters_list.next;
 	} else {
-		list = ls->ls_root_list.next;
+		list = ls->ls_masters_list.next;
 	}
 
-	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
-		r = list_entry(list, struct dlm_rsb, res_root_list);
-		if (r->res_nodeid)
-			continue;
-
+	for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
+		r = list_entry(list, struct dlm_rsb, res_masters_list);
 		dir_nodeid = dlm_dir_nodeid(r);
 		if (dir_nodeid != nodeid)
 			continue;
@@ -294,7 +288,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	 * terminating record.
 	 */
 
-	if ((list == &ls->ls_root_list) &&
+	if ((list == &ls->ls_masters_list) &&
 	    (offset + sizeof(uint16_t) <= outlen)) {
 		be_namelen = cpu_to_be16(0xFFFF);
 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
@@ -302,6 +296,6 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 		ls->ls_recover_dir_sent_msg++;
 	}
  out:
-	up_read(&ls->ls_root_sem);
+	read_unlock(&ls->ls_masters_lock);
 }
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 1d2ee5c2d23d..3524f2b33f2c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -342,6 +342,7 @@ struct dlm_rsb {
 	struct list_head	res_waitqueue;
 
 	struct list_head	res_root_list;	    /* used for recovery */
+	struct list_head	res_masters_list;   /* used for recovery */
 	struct list_head	res_recover_list;   /* used for recovery */
 	int			res_recover_locks_count;
 
@@ -675,6 +676,8 @@ struct dlm_ls {
 
 	struct list_head	ls_root_list;	/* root resources */
 	struct rw_semaphore	ls_root_sem;	/* protect root_list */
+	struct list_head	ls_masters_list;	/* root resources */
+	rwlock_t		ls_masters_lock;	/* protect root_list */
 
 	const struct dlm_lockspace_ops *ls_ops;
 	void			*ls_ops_arg;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index d87464614bc5..e0ab7432ca4d 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -423,6 +423,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	INIT_LIST_HEAD(&r->res_waitqueue);
 	INIT_LIST_HEAD(&r->res_root_list);
 	INIT_LIST_HEAD(&r->res_recover_list);
+	INIT_LIST_HEAD(&r->res_masters_list);
 
 	*r_ret = r;
 	return 0;
@@ -1168,6 +1169,7 @@ static void kill_rsb(struct kref *kref)
 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
+	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
 }
 
 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 0455dddb0797..c427c76b5f07 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -582,6 +582,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_waitqueue_head(&ls->ls_wait_general);
 	INIT_LIST_HEAD(&ls->ls_root_list);
 	init_rwsem(&ls->ls_root_sem);
+	INIT_LIST_HEAD(&ls->ls_masters_list);
+	rwlock_init(&ls->ls_masters_lock);
 
 	spin_lock(&lslist_lock);
 	ls->ls_create_count = 1;
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 8eb42554ccb0..dfce8fc6a783 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -20,6 +20,48 @@
 #include "requestqueue.h"
 #include "recoverd.h"
 
+static int dlm_create_masters_list(struct dlm_ls *ls)
+{
+	struct rb_node *n;
+	struct dlm_rsb *r;
+	int i, error = 0;
+
+	write_lock(&ls->ls_masters_lock);
+	if (!list_empty(&ls->ls_masters_list)) {
+		log_error(ls, "root list not empty");
+		error = -EINVAL;
+		goto out;
+	}
+
+	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (r->res_nodeid)
+				continue;
+
+			list_add(&r->res_masters_list, &ls->ls_masters_list);
+			dlm_hold_rsb(r);
+		}
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
+	}
+ out:
+	write_unlock(&ls->ls_masters_lock);
+	return error;
+}
+
+static void dlm_release_masters_list(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r, *safe;
+
+	write_lock(&ls->ls_masters_lock);
+	list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
+		list_del_init(&r->res_masters_list);
+		dlm_put_rsb(r);
+	}
+	write_unlock(&ls->ls_masters_lock);
+}
+
 static void dlm_create_root_list(struct dlm_ls *ls)
 {
 	struct rb_node *n;
@@ -123,6 +165,23 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
 	dlm_recover_dir_nodeid(ls);
 
+	/* Create a snapshot of all active rsbs were we are the master of.
+	 * During the barrier between dlm_recover_members_wait() and
+	 * dlm_recover_directory() other nodes can dump their necessary
+	 * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
+	 * communication dlm_copy_master_names() handling.
+	 *
+	 * TODO We should create a per lockspace list that contains rsbs
+	 * that we are the master of. Instead of creating this list while
+	 * recovery we keep track of those rsbs while locking handling and
+	 * recovery can use it when necessary.
+	 */
+	error = dlm_create_masters_list(ls);
+	if (error) {
+		log_rinfo(ls, "dlm_create_masters_list error %d", error);
+		goto fail;
+	}
+
 	ls->ls_recover_dir_sent_res = 0;
 	ls->ls_recover_dir_sent_msg = 0;
 	ls->ls_recover_locks_in = 0;
@@ -132,6 +191,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	error = dlm_recover_members_wait(ls, rv->seq);
 	if (error) {
 		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
+		dlm_release_masters_list(ls);
 		goto fail;
 	}
 
@@ -145,6 +205,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	error = dlm_recover_directory(ls, rv->seq);
 	if (error) {
 		log_rinfo(ls, "dlm_recover_directory error %d", error);
+		dlm_release_masters_list(ls);
 		goto fail;
 	}
 
@@ -153,9 +214,12 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	error = dlm_recover_directory_wait(ls, rv->seq);
 	if (error) {
 		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
+		dlm_release_masters_list(ls);
 		goto fail;
 	}
 
+	dlm_release_masters_list(ls);
+
 	log_rinfo(ls, "dlm_recover_directory %u out %u messages",
 		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 06/15] dlm: move root_list to ls_recover() stack
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (4 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 05/15] dlm: move master dir dump to own list Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 07/15] dlm: implement directory dump context Alexander Aring
                   ` (8 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch moves the per lockspace ls_root_list list which is mainly
used for snapshoting all dlm_rsb from a hash to a list to do recovery
handling into the recovery function ls_recover() as stack variable.

Doing that shows that there is no need for locking the ls_root_list
which is created at the beginning of ls_recover() and destroyed at the
ending of ls_recover(). As there can't be concurrent execution of
ls_recover() at one time it should be safe to store all rsbs in the
ls_root_list and using the rsbs res_root_list holder.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dir.c          |  6 ++---
 fs/dlm/dir.h          |  3 ++-
 fs/dlm/dlm_internal.h |  6 ++---
 fs/dlm/lock.c         |  6 ++---
 fs/dlm/lock.h         |  2 +-
 fs/dlm/lockspace.c    |  2 --
 fs/dlm/recover.c      | 30 ++++++++----------------
 fs/dlm/recover.h      |  8 ++++---
 fs/dlm/recoverd.c     | 54 ++++++++++++++++++-------------------------
 9 files changed, 47 insertions(+), 70 deletions(-)

diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 10753486049a..3da00c46cbb3 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -47,15 +47,13 @@ int dlm_dir_nodeid(struct dlm_rsb *r)
 	return r->res_dir_nodeid;
 }
 
-void dlm_recover_dir_nodeid(struct dlm_ls *ls)
+void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 	}
-	up_read(&ls->ls_root_sem);
 }
 
 int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h
index 39ecb69d7ef3..5b2a7ee3762d 100644
--- a/fs/dlm/dir.h
+++ b/fs/dlm/dir.h
@@ -14,7 +14,8 @@
 
 int dlm_dir_nodeid(struct dlm_rsb *rsb);
 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
-void dlm_recover_dir_nodeid(struct dlm_ls *ls);
+void dlm_recover_dir_nodeid(struct dlm_ls *ls,
+			    const struct list_head *root_list);
 int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq);
 void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 			   char *outbuf, int outlen, int nodeid);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3524f2b33f2c..f434325d5bc8 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -674,10 +674,8 @@ struct dlm_ls {
 	wait_queue_head_t	ls_recover_lock_wait;
 	spinlock_t		ls_clear_proc_locks;
 
-	struct list_head	ls_root_list;	/* root resources */
-	struct rw_semaphore	ls_root_sem;	/* protect root_list */
-	struct list_head	ls_masters_list;	/* root resources */
-	rwlock_t		ls_masters_lock;	/* protect root_list */
+	struct list_head	ls_masters_list; /* root resources */
+	rwlock_t		ls_masters_lock; /* protect root_list */
 
 	const struct dlm_lockspace_ops *ls_ops;
 	void			*ls_ops_arg;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e0ab7432ca4d..43a2f4d0af53 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5227,7 +5227,7 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
 
 /* Get rid of locks held by nodes that are gone. */
 
-void dlm_recover_purge(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	struct dlm_member *memb;
@@ -5246,8 +5246,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
 	if (!nodes_count)
 		return;
 
-	down_write(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		hold_rsb(r);
 		lock_rsb(r);
 		if (is_master(r)) {
@@ -5262,7 +5261,6 @@ void dlm_recover_purge(struct dlm_ls *ls)
 		unhold_rsb(r);
 		cond_resched();
 	}
-	up_write(&ls->ls_root_sem);
 
 	if (lkb_count)
 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index b54e2cbbe6e2..c8ff7780d3cc 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -31,7 +31,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
 			struct dlm_rsb **r_ret);
 
-void dlm_recover_purge(struct dlm_ls *ls);
+void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list);
 void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
 void dlm_recover_grant(struct dlm_ls *ls);
 int dlm_recover_waiters_post(struct dlm_ls *ls);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index c427c76b5f07..da756e5c0f6c 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -580,8 +580,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	ls->ls_recover_list_count = 0;
 	ls->ls_local_handle = ls;
 	init_waitqueue_head(&ls->ls_wait_general);
-	INIT_LIST_HEAD(&ls->ls_root_list);
-	init_rwsem(&ls->ls_root_sem);
 	INIT_LIST_HEAD(&ls->ls_masters_list);
 	rwlock_init(&ls->ls_masters_lock);
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 6abc283f8f36..172c6b73f37a 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -519,7 +519,8 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
  * the correct dir node.
  */
 
-int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
+int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
+			const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	unsigned int total = 0;
@@ -529,10 +530,8 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
 
 	log_rinfo(ls, "dlm_recover_masters");
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		if (dlm_recovery_stopped(ls)) {
-			up_read(&ls->ls_root_sem);
 			error = -EINTR;
 			goto out;
 		}
@@ -546,12 +545,9 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
 		cond_resched();
 		total++;
 
-		if (error) {
-			up_read(&ls->ls_root_sem);
+		if (error)
 			goto out;
-		}
 	}
-	up_read(&ls->ls_root_sem);
 
 	log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
 
@@ -656,13 +652,13 @@ static int recover_locks(struct dlm_rsb *r, uint64_t seq)
 	return error;
 }
 
-int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
+int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
+		      const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	int error, count = 0;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		if (is_master(r)) {
 			rsb_clear_flag(r, RSB_NEW_MASTER);
 			continue;
@@ -673,19 +669,15 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
 
 		if (dlm_recovery_stopped(ls)) {
 			error = -EINTR;
-			up_read(&ls->ls_root_sem);
 			goto out;
 		}
 
 		error = recover_locks(r, seq);
-		if (error) {
-			up_read(&ls->ls_root_sem);
+		if (error)
 			goto out;
-		}
 
 		count += r->res_recover_locks_count;
 	}
-	up_read(&ls->ls_root_sem);
 
 	log_rinfo(ls, "dlm_recover_locks %d out", count);
 
@@ -854,13 +846,12 @@ static void recover_grant(struct dlm_rsb *r)
 		rsb_set_flag(r, RSB_RECOVER_GRANT);
 }
 
-void dlm_recover_rsbs(struct dlm_ls *ls)
+void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
 {
 	struct dlm_rsb *r;
 	unsigned int count = 0;
 
-	down_read(&ls->ls_root_sem);
-	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry(r, root_list, res_root_list) {
 		lock_rsb(r);
 		if (is_master(r)) {
 			if (rsb_flag(r, RSB_RECOVER_CONVERT))
@@ -881,7 +872,6 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 		rsb_clear_flag(r, RSB_NEW_MASTER2);
 		unlock_rsb(r);
 	}
-	up_read(&ls->ls_root_sem);
 
 	if (count)
 		log_rinfo(ls, "dlm_recover_rsbs %d done", count);
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index 0b54550ee055..efc79a6e577d 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -19,12 +19,14 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq);
 int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq);
-int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
+			const struct list_head *root_list);
 int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
-int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
+int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
+		      const struct list_head *root_list);
 void dlm_recovered_lock(struct dlm_rsb *r);
 void dlm_clear_toss(struct dlm_ls *ls);
-void dlm_recover_rsbs(struct dlm_ls *ls);
+void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list);
 
 #endif				/* __RECOVER_DOT_H__ */
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index dfce8fc6a783..f6acc7351625 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -62,23 +62,17 @@ static void dlm_release_masters_list(struct dlm_ls *ls)
 	write_unlock(&ls->ls_masters_lock);
 }
 
-static void dlm_create_root_list(struct dlm_ls *ls)
+static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 {
 	struct rb_node *n;
 	struct dlm_rsb *r;
 	int i;
 
-	down_write(&ls->ls_root_sem);
-	if (!list_empty(&ls->ls_root_list)) {
-		log_error(ls, "root list not empty");
-		goto out;
-	}
-
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_add(&r->res_root_list, &ls->ls_root_list);
+			list_add(&r->res_root_list, root_list);
 			dlm_hold_rsb(r);
 		}
 
@@ -86,20 +80,16 @@ static void dlm_create_root_list(struct dlm_ls *ls)
 			log_error(ls, "%s toss not empty", __func__);
 		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
- out:
-	up_write(&ls->ls_root_sem);
 }
 
-static void dlm_release_root_list(struct dlm_ls *ls)
+static void dlm_release_root_list(struct list_head *root_list)
 {
 	struct dlm_rsb *r, *safe;
 
-	down_write(&ls->ls_root_sem);
-	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
+	list_for_each_entry_safe(r, safe, root_list, res_root_list) {
 		list_del_init(&r->res_root_list);
 		dlm_put_rsb(r);
 	}
-	up_write(&ls->ls_root_sem);
 }
 
 /* If the start for which we're re-enabling locking (seq) has been superseded
@@ -131,6 +121,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 
 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 {
+	LIST_HEAD(root_list);
 	unsigned long start;
 	int error, neg = 0;
 
@@ -147,7 +138,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	 * routines.
 	 */
 
-	dlm_create_root_list(ls);
+	dlm_create_root_list(ls, &root_list);
 
 	/*
 	 * Add or remove nodes from the lockspace's ls_nodes list.
@@ -163,7 +154,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		goto fail;
 	}
 
-	dlm_recover_dir_nodeid(ls);
+	dlm_recover_dir_nodeid(ls, &root_list);
 
 	/* Create a snapshot of all active rsbs were we are the master of.
 	 * During the barrier between dlm_recover_members_wait() and
@@ -179,7 +170,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	error = dlm_create_masters_list(ls);
 	if (error) {
 		log_rinfo(ls, "dlm_create_masters_list error %d", error);
-		goto fail;
+		goto fail_root_list;
 	}
 
 	ls->ls_recover_dir_sent_res = 0;
@@ -192,7 +183,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	if (error) {
 		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
 		dlm_release_masters_list(ls);
-		goto fail;
+		goto fail_root_list;
 	}
 
 	start = jiffies;
@@ -206,7 +197,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	if (error) {
 		log_rinfo(ls, "dlm_recover_directory error %d", error);
 		dlm_release_masters_list(ls);
-		goto fail;
+		goto fail_root_list;
 	}
 
 	dlm_set_recover_status(ls, DLM_RS_DIR);
@@ -215,7 +206,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 	if (error) {
 		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
 		dlm_release_masters_list(ls);
-		goto fail;
+		goto fail_root_list;
 	}
 
 	dlm_release_masters_list(ls);
@@ -233,7 +224,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
 	if (dlm_recovery_stopped(ls)) {
 		error = -EINTR;
-		goto fail;
+		goto fail_root_list;
 	}
 
 	if (neg || dlm_no_directory(ls)) {
@@ -241,27 +232,27 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		 * Clear lkb's for departed nodes.
 		 */
 
-		dlm_recover_purge(ls);
+		dlm_recover_purge(ls, &root_list);
 
 		/*
 		 * Get new master nodeid's for rsb's that were mastered on
 		 * departed nodes.
 		 */
 
-		error = dlm_recover_masters(ls, rv->seq);
+		error = dlm_recover_masters(ls, rv->seq, &root_list);
 		if (error) {
 			log_rinfo(ls, "dlm_recover_masters error %d", error);
-			goto fail;
+			goto fail_root_list;
 		}
 
 		/*
 		 * Send our locks on remastered rsb's to the new masters.
 		 */
 
-		error = dlm_recover_locks(ls, rv->seq);
+		error = dlm_recover_locks(ls, rv->seq, &root_list);
 		if (error) {
 			log_rinfo(ls, "dlm_recover_locks error %d", error);
-			goto fail;
+			goto fail_root_list;
 		}
 
 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
@@ -269,7 +260,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		error = dlm_recover_locks_wait(ls, rv->seq);
 		if (error) {
 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
-			goto fail;
+			goto fail_root_list;
 		}
 
 		log_rinfo(ls, "dlm_recover_locks %u in",
@@ -281,7 +272,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		 * settings.
 		 */
 
-		dlm_recover_rsbs(ls);
+		dlm_recover_rsbs(ls, &root_list);
 	} else {
 		/*
 		 * Other lockspace members may be going through the "neg" steps
@@ -293,11 +284,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		error = dlm_recover_locks_wait(ls, rv->seq);
 		if (error) {
 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
-			goto fail;
+			goto fail_root_list;
 		}
 	}
 
-	dlm_release_root_list(ls);
+	dlm_release_root_list(&root_list);
 
 	/*
 	 * Purge directory-related requests that are saved in requestqueue.
@@ -346,8 +337,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
 	return 0;
 
+ fail_root_list:
+	dlm_release_root_list(&root_list);
  fail:
-	dlm_release_root_list(ls);
 	mutex_unlock(&ls->ls_recoverd_active);
 
 	return error;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 07/15] dlm: implement directory dump context
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (5 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 06/15] dlm: move root_list to ls_recover() stack Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 08/15] dlm: drop holding waiters mutex in waiters recovery Alexander Aring
                   ` (7 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch introduce to keep track of an directory dump in DLM. For now
we only add more sanity checks if e.g. the recovery sequence number has
been changed while dumping the directory. Another change is that we can
keep track of a per nodeid directory dump that can be later being used
to add log messages about how much entries in how many chunks was being
sent to a specific nodeid.

That the whole dump depends on the recovery barrier, because the
resource list is not manipulated during this time may later being
improved. For now we add more sanity checks in the recovery low path to
confirm there is no issue with the current behaviour e.g. it also checks
if the same list entry was being returned from the last resource lookup
vs last resource list entry.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dir.c          | 115 ++++++++++++++++++++++++++++++++++++++++--
 fs/dlm/dlm_internal.h |   4 +-
 fs/dlm/lockspace.c    |   2 +
 fs/dlm/recoverd.c     |   5 --
 4 files changed, 116 insertions(+), 10 deletions(-)

diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 3da00c46cbb3..0dc8a1d9e411 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -224,6 +224,80 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	return NULL;
 }
 
+struct dlm_dir_dump {
+	/* init values to match if whole
+	 * dump fits to one seq. Sanity check only.
+	 */
+	uint64_t seq_init;
+	uint64_t nodeid_init;
+	/* compare local pointer with last lookup,
+	 * just a sanity check.
+	 */
+	struct list_head *last;
+
+	unsigned int sent_res; /* for log info */
+	unsigned int sent_msg; /* for log info */
+
+	struct list_head list;
+};
+
+static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_dir_dump *dd, *safe;
+
+	write_lock(&ls->ls_dir_dump_lock);
+	list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
+		if (dd->nodeid_init == nodeid) {
+			log_error(ls, "drop dump seq %llu",
+				 (unsigned long long)dd->seq_init);
+			list_del(&dd->list);
+			kfree(dd);
+		}
+	}
+	write_unlock(&ls->ls_dir_dump_lock);
+}
+
+static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_dir_dump *iter, *dd = NULL;
+
+	read_lock(&ls->ls_dir_dump_lock);
+	list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
+		if (iter->nodeid_init == nodeid) {
+			dd = iter;
+			break;
+		}
+	}
+	read_unlock(&ls->ls_dir_dump_lock);
+
+	return dd;
+}
+
+static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_dir_dump *dd;
+
+	dd = lookup_dir_dump(ls, nodeid);
+	if (dd) {
+		log_error(ls, "found ongoing dir dump for node %d, will drop it",
+			  nodeid);
+		drop_dir_ctx(ls, nodeid);
+	}
+
+	dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
+	if (!dd)
+		return NULL;
+
+	dd->seq_init = ls->ls_recover_seq;
+	dd->nodeid_init = nodeid;
+
+	write_lock(&ls->ls_dir_dump_lock);
+	list_add(&dd->list, &ls->ls_dir_dump_list);
+	write_unlock(&ls->ls_dir_dump_lock);
+
+	return dd;
+}
+
 /* Find the rsb where we left off (or start again), then send rsb names
    for rsb's we're master of and whose directory node matches the requesting
    node.  inbuf is the rsb name last sent, inlen is the name's length */
@@ -234,11 +308,20 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	struct list_head *list;
 	struct dlm_rsb *r;
 	int offset = 0, dir_nodeid;
+	struct dlm_dir_dump *dd;
 	__be16 be_namelen;
 
 	read_lock(&ls->ls_masters_lock);
 
 	if (inlen > 1) {
+		dd = lookup_dir_dump(ls, nodeid);
+		if (!dd) {
+			log_error(ls, "failed to lookup dir dump context nodeid: %d",
+				  nodeid);
+			goto out;
+		}
+
+		/* next chunk in dump */
 		r = find_rsb_root(ls, inbuf, inlen);
 		if (!r) {
 			log_error(ls, "copy_master_names from %d start %d %.*s",
@@ -246,8 +329,25 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 			goto out;
 		}
 		list = r->res_masters_list.next;
+
+		/* sanity checks */
+		if (dd->last != &r->res_masters_list ||
+		    dd->seq_init != ls->ls_recover_seq) {
+			log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
+				  (unsigned long long)dd->seq_init,
+				  (unsigned long long)ls->ls_recover_seq);
+			goto out;
+		}
 	} else {
+		dd = init_dir_dump(ls, nodeid);
+		if (!dd) {
+			log_error(ls, "failed to allocate dir dump context");
+			goto out;
+		}
+
+		/* start dump */
 		list = ls->ls_masters_list.next;
+		dd->last = list;
 	}
 
 	for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
@@ -269,7 +369,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 			be_namelen = cpu_to_be16(0);
 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 			offset += sizeof(__be16);
-			ls->ls_recover_dir_sent_msg++;
+			dd->sent_msg++;
 			goto out;
 		}
 
@@ -278,7 +378,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 		offset += sizeof(__be16);
 		memcpy(outbuf + offset, r->res_name, r->res_length);
 		offset += r->res_length;
-		ls->ls_recover_dir_sent_res++;
+		dd->sent_res++;
+		dd->last = list;
 	}
 
 	/*
@@ -288,10 +389,18 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 
 	if ((list == &ls->ls_masters_list) &&
 	    (offset + sizeof(uint16_t) <= outlen)) {
+		/* end dump */
 		be_namelen = cpu_to_be16(0xFFFF);
 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 		offset += sizeof(__be16);
-		ls->ls_recover_dir_sent_msg++;
+		dd->sent_msg++;
+		log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
+			  nodeid, dd->sent_res, dd->sent_msg);
+
+		write_lock(&ls->ls_dir_dump_lock);
+		list_del_init(&dd->list);
+		write_unlock(&ls->ls_dir_dump_lock);
+		kfree(dd);
 	}
  out:
 	read_unlock(&ls->ls_masters_lock);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index f434325d5bc8..e03a379832d5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -660,8 +660,6 @@ struct dlm_ls {
 	struct mutex		ls_requestqueue_mutex;
 	struct dlm_rcom		*ls_recover_buf;
 	int			ls_recover_nodeid; /* for debugging */
-	unsigned int		ls_recover_dir_sent_res; /* for log info */
-	unsigned int		ls_recover_dir_sent_msg; /* for log info */
 	unsigned int		ls_recover_locks_in; /* for log info */
 	uint64_t		ls_rcom_seq;
 	spinlock_t		ls_rcom_spin;
@@ -676,6 +674,8 @@ struct dlm_ls {
 
 	struct list_head	ls_masters_list; /* root resources */
 	rwlock_t		ls_masters_lock; /* protect root_list */
+	struct list_head	ls_dir_dump_list; /* root resources */
+	rwlock_t		ls_dir_dump_lock; /* protect root_list */
 
 	const struct dlm_lockspace_ops *ls_ops;
 	void			*ls_ops_arg;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index da756e5c0f6c..af7769f8e38c 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -582,6 +582,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_waitqueue_head(&ls->ls_wait_general);
 	INIT_LIST_HEAD(&ls->ls_masters_list);
 	rwlock_init(&ls->ls_masters_lock);
+	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
+	rwlock_init(&ls->ls_dir_dump_lock);
 
 	spin_lock(&lslist_lock);
 	ls->ls_create_count = 1;
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index f6acc7351625..0b1a62167798 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -173,8 +173,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		goto fail_root_list;
 	}
 
-	ls->ls_recover_dir_sent_res = 0;
-	ls->ls_recover_dir_sent_msg = 0;
 	ls->ls_recover_locks_in = 0;
 
 	dlm_set_recover_status(ls, DLM_RS_NODES);
@@ -211,9 +209,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
 	dlm_release_masters_list(ls);
 
-	log_rinfo(ls, "dlm_recover_directory %u out %u messages",
-		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
-
 	/*
 	 * We may have outstanding operations that are waiting for a reply from
 	 * a failed node.  Mark these to be resent after recovery.  Unlock and
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 08/15] dlm: drop holding waiters mutex in waiters recovery
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (6 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 07/15] dlm: implement directory dump context Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 09/15] dlm: convert ls_waiters_mutex to spinlock Alexander Aring
                   ` (6 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch drops to hold the ls_waiters_mutex in
dlm_recover_waiters_pre(). The dlm_recover_waiters_pre() function is
only being called when recovery handling is being done for the specific
lockspace. During this time there can't be new lock requests initiated
or dlm messages being processed that manipulates the lockspace waiters
list.

Only debugfs can access the lockspace waiters list when
dlm_recover_waiters_pre() may manipulates it. This is not possible
anymore because debugfs will hold the recovery lock when it's acccessing
the waiters list.

A check was introduced in remove_from_waiters_ms() for local dlm
messaging to check if really the lockspace is stopped and no new lock
requests can be initiated.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c | 13 +++++++++++++
 fs/dlm/lock.c     | 17 +++++++++--------
 fs/dlm/lock.h     |  1 +
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 19cdedd56629..c238a9308323 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -737,6 +737,12 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 	size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
 
 	mutex_lock(&debug_buf_lock);
+	ret = dlm_lock_recovery_try(ls);
+	if (!ret) {
+		rv = -EAGAIN;
+		goto out;
+	}
+
 	mutex_lock(&ls->ls_waiters_mutex);
 	memset(debug_buf, 0, sizeof(debug_buf));
 
@@ -749,8 +755,10 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 		pos += ret;
 	}
 	mutex_unlock(&ls->ls_waiters_mutex);
+	dlm_unlock_recovery(ls);
 
 	rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
+out:
 	mutex_unlock(&debug_buf_lock);
 	return rv;
 }
@@ -772,7 +780,12 @@ static ssize_t waiters_write(struct file *file, const char __user *user_buf,
 	if (n != 3)
 		return -EINVAL;
 
+	error = dlm_lock_recovery_try(ls);
+	if (!error)
+		return -EAGAIN;
+
 	error = dlm_debug_add_lkb_to_waiters(ls, lkb_id, mstype, to_nodeid);
+	dlm_unlock_recovery(ls);
 	if (error)
 		return error;
 
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 43a2f4d0af53..395b904a82f4 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -201,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r)
 
 /* Threads cannot use the lockspace while it's being recovered */
 
-static inline void dlm_lock_recovery(struct dlm_ls *ls)
+void dlm_lock_recovery(struct dlm_ls *ls)
 {
 	down_read(&ls->ls_in_recovery);
 }
@@ -1556,7 +1556,11 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 }
 
 /* Handles situations where we might be processing a "fake" or "local" reply in
-   which we can't try to take waiters_mutex again. */
+ * the recovery context which stops any locking activity. Only debugfs might
+ * change the lockspace waiters but they will held the recovery lock to ensure
+ * remove_from_waiters_ms() in local case will be the only user manipulating the
+ * lockspace waiters in recovery context.
+ */
 
 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 				  const struct dlm_message *ms, bool local)
@@ -1566,6 +1570,9 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 
 	if (!local)
 		mutex_lock(&ls->ls_waiters_mutex);
+	else
+		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
+			     !dlm_locking_stopped(ls));
 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
 	if (!local)
 		mutex_unlock(&ls->ls_waiters_mutex);
@@ -4398,7 +4405,6 @@ static void _receive_convert_reply(struct dlm_lkb *lkb,
 	if (error)
 		goto out;
 
-	/* local reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms, local);
 	if (error)
 		goto out;
@@ -4437,7 +4443,6 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb,
 	if (error)
 		goto out;
 
-	/* local reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms, local);
 	if (error)
 		goto out;
@@ -4489,7 +4494,6 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb,
 	if (error)
 		goto out;
 
-	/* local reply can happen with waiters_mutex held */
 	error = remove_from_waiters_ms(lkb, ms, local);
 	if (error)
 		goto out;
@@ -4890,8 +4894,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 	if (!ms_local)
 		return;
 
-	mutex_lock(&ls->ls_waiters_mutex);
-
 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
 
 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
@@ -4984,7 +4986,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 		}
 		schedule();
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
 	kfree(ms_local);
 }
 
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index c8ff7780d3cc..b2fd74a2f8eb 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -23,6 +23,7 @@ void dlm_hold_rsb(struct dlm_rsb *r);
 int dlm_put_lkb(struct dlm_lkb *lkb);
 void dlm_scan_rsbs(struct dlm_ls *ls);
 int dlm_lock_recovery_try(struct dlm_ls *ls);
+void dlm_lock_recovery(struct dlm_ls *ls);
 void dlm_unlock_recovery(struct dlm_ls *ls);
 
 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 09/15] dlm: convert ls_waiters_mutex to spinlock
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (7 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 08/15] dlm: drop holding waiters mutex in waiters recovery Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 10/15] dlm: convert res_lock " Alexander Aring
                   ` (5 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch converts the per dlm lockspace waiters lock from a mutex to a
spinlock. To prepare switching to a softirq context when parsing DLM
messages.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |  4 ++--
 fs/dlm/dlm_internal.h |  2 +-
 fs/dlm/lock.c         | 20 ++++++++++----------
 fs/dlm/lockspace.c    |  2 +-
 4 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index c238a9308323..487dcf05d076 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -743,7 +743,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 		goto out;
 	}
 
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 	memset(debug_buf, 0, sizeof(debug_buf));
 
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
@@ -754,7 +754,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 			break;
 		pos += ret;
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 	dlm_unlock_recovery(ls);
 
 	rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index e03a379832d5..98029fd5cd2b 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -595,7 +595,7 @@ struct dlm_ls {
 	struct dlm_rsbtable	*ls_rsbtbl;
 	uint32_t		ls_rsbtbl_size;
 
-	struct mutex		ls_waiters_mutex;
+	spinlock_t		ls_waiters_lock;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
 	struct mutex		ls_orphans_mutex;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 395b904a82f4..af677add4f5f 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1406,7 +1406,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error = 0;
 
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 
 	if (is_overlap_unlock(lkb) ||
 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
@@ -1449,7 +1449,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1549,9 +1549,9 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error;
 
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 	error = _remove_from_waiters(lkb, mstype, NULL);
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1569,13 +1569,13 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	int error;
 
 	if (!local)
-		mutex_lock(&ls->ls_waiters_mutex);
+		spin_lock(&ls->ls_waiters_lock);
 	else
 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
 			     !dlm_locking_stopped(ls));
 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
 	if (!local)
-		mutex_unlock(&ls->ls_waiters_mutex);
+		spin_unlock(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -4993,7 +4993,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb = NULL, *iter;
 
-	mutex_lock(&ls->ls_waiters_mutex);
+	spin_lock(&ls->ls_waiters_lock);
 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
 			hold_lkb(iter);
@@ -5001,7 +5001,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 			break;
 		}
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	spin_unlock(&ls->ls_waiters_lock);
 
 	return lkb;
 }
@@ -5101,9 +5101,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		}
 
 		/* Forcibly remove from waiters list */
-		mutex_lock(&ls->ls_waiters_mutex);
+		spin_lock(&ls->ls_waiters_lock);
 		list_del_init(&lkb->lkb_wait_reply);
-		mutex_unlock(&ls->ls_waiters_mutex);
+		spin_unlock(&ls->ls_waiters_lock);
 
 		/*
 		 * The lkb is now clear of all prior waiters state and can be
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index af7769f8e38c..945139805605 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -515,7 +515,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	spin_lock_init(&ls->ls_lkbidr_spin);
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
-	mutex_init(&ls->ls_waiters_mutex);
+	spin_lock_init(&ls->ls_waiters_lock);
 	INIT_LIST_HEAD(&ls->ls_orphans);
 	mutex_init(&ls->ls_orphans_mutex);
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 10/15] dlm: convert res_lock to spinlock
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (8 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 09/15] dlm: convert ls_waiters_mutex to spinlock Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 11/15] dlm: make requestqueue handling non sleepable Alexander Aring
                   ` (4 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch converts the per dlm rsb res_lock from a mutex to a spinlock.
To prepare switching to a softirq context when parsing DLM messages.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h | 2 +-
 fs/dlm/lock.c         | 2 +-
 fs/dlm/lock.h         | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 98029fd5cd2b..9f98a815f935 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -320,7 +320,7 @@ struct dlm_lkb {
 struct dlm_rsb {
 	struct dlm_ls		*res_ls;	/* the lockspace */
 	struct kref		res_ref;
-	struct mutex		res_mutex;
+	spinlock_t		res_lock;
 	unsigned long		res_flags;
 	int			res_length;	/* length of rsb name */
 	int			res_nodeid;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index af677add4f5f..7b309231eebd 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -415,7 +415,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	r->res_ls = ls;
 	r->res_length = len;
 	memcpy(r->res_name, name, len);
-	mutex_init(&r->res_mutex);
+	spin_lock_init(&r->res_lock);
 
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index b2fd74a2f8eb..0f6b2700c0da 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -69,12 +69,12 @@ static inline int is_master(struct dlm_rsb *r)
 
 static inline void lock_rsb(struct dlm_rsb *r)
 {
-	mutex_lock(&r->res_mutex);
+	spin_lock(&r->res_lock);
 }
 
 static inline void unlock_rsb(struct dlm_rsb *r)
 {
-	mutex_unlock(&r->res_mutex);
+	spin_unlock(&r->res_lock);
 }
 
 #endif
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 11/15] dlm: make requestqueue handling non sleepable
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (9 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 10/15] dlm: convert res_lock " Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 12/15] dlm: ls_recv_active semaphore to rwlock Alexander Aring
                   ` (3 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch removes the ls_requestqueue_wait and convert the
ls_requestqueue_mutex to a rw lock. Instead of calling wait_event() in
dlm processing which waits until all messages are processed and allow
new message processing after recovering is done, this patch is using a
bitflag to signal when a message should be saved for future or not. When
recovery processes all saved messages we will clear this bit again and
allow new messages to processed directly.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h |  5 ++---
 fs/dlm/lock.c         | 16 ++++++++++++++--
 fs/dlm/lockspace.c    |  4 +---
 fs/dlm/member.c       |  5 +++++
 fs/dlm/requestqueue.c | 41 ++++++++---------------------------------
 5 files changed, 30 insertions(+), 41 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 9f98a815f935..61820b8c47a7 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -655,9 +655,7 @@ struct dlm_ls {
 	struct rw_semaphore	ls_in_recovery;	/* block local requests */
 	struct rw_semaphore	ls_recv_active;	/* block dlm_recv */
 	struct list_head	ls_requestqueue;/* queue remote requests */
-	atomic_t		ls_requestqueue_cnt;
-	wait_queue_head_t	ls_requestqueue_wait;
-	struct mutex		ls_requestqueue_mutex;
+	rwlock_t		ls_requestqueue_lock;
 	struct dlm_rcom		*ls_recover_buf;
 	int			ls_recover_nodeid; /* for debugging */
 	unsigned int		ls_recover_locks_in; /* for log info */
@@ -717,6 +715,7 @@ struct dlm_ls {
 #define LSFL_UEVENT_WAIT	7
 #define LSFL_CB_DELAY		9
 #define LSFL_NODIR		10
+#define LSFL_RECV_MSG_BLOCKED	11
 
 #define DLM_PROC_FLAGS_CLOSING 1
 #define DLM_PROC_FLAGS_COMPAT  2
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 7b309231eebd..98d9c5a4be00 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4752,20 +4752,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 				int nodeid)
 {
-	if (dlm_locking_stopped(ls)) {
+try_again:
+	read_lock(&ls->ls_requestqueue_lock);
+	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
 		/* If we were a member of this lockspace, left, and rejoined,
 		   other nodes may still be sending us messages from the
 		   lockspace generation before we left. */
 		if (WARN_ON_ONCE(!ls->ls_generation)) {
+			read_unlock(&ls->ls_requestqueue_lock);
 			log_limit(ls, "receive %d from %d ignore old gen",
 				  le32_to_cpu(ms->m_type), nodeid);
 			return;
 		}
 
+		read_unlock(&ls->ls_requestqueue_lock);
+		write_lock(&ls->ls_requestqueue_lock);
+		/* recheck because we hold writelock now */
+		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
+			write_unlock_bh(&ls->ls_requestqueue_lock);
+			goto try_again;
+		}
+
 		dlm_add_requestqueue(ls, nodeid, ms);
+		write_unlock(&ls->ls_requestqueue_lock);
 	} else {
-		dlm_wait_requestqueue(ls);
 		_receive_message(ls, ms, 0);
+		read_unlock(&ls->ls_requestqueue_lock);
 	}
 }
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 945139805605..757e473bc619 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -554,9 +554,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	init_rwsem(&ls->ls_in_recovery);
 	init_rwsem(&ls->ls_recv_active);
 	INIT_LIST_HEAD(&ls->ls_requestqueue);
-	atomic_set(&ls->ls_requestqueue_cnt, 0);
-	init_waitqueue_head(&ls->ls_requestqueue_wait);
-	mutex_init(&ls->ls_requestqueue_mutex);
+	rwlock_init(&ls->ls_requestqueue_lock);
 	spin_lock_init(&ls->ls_clear_proc_locks);
 
 	/* Due backwards compatibility with 3.1 we need to use maximum
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index be7909ead71b..707cebcdc533 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -642,6 +642,11 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 	ls->ls_recover_seq++;
+
+	/* activate requestqueue and stop processing */
+	write_lock(&ls->ls_requestqueue_lock);
+	set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
+	write_unlock(&ls->ls_requestqueue_lock);
 	spin_unlock(&ls->ls_recover_lock);
 
 	/*
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index c05940afd063..9b646026df46 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
 	memcpy(&e->request, ms, sizeof(*ms));
 	memcpy(&e->request.m_extra, ms->m_extra, length);
 
-	atomic_inc(&ls->ls_requestqueue_cnt);
-	mutex_lock(&ls->ls_requestqueue_mutex);
 	list_add_tail(&e->list, &ls->ls_requestqueue);
-	mutex_unlock(&ls->ls_requestqueue_mutex);
 }
 
 /*
@@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	int error = 0;
 
-	mutex_lock(&ls->ls_requestqueue_mutex);
-
+	write_lock(&ls->ls_requestqueue_lock);
 	for (;;) {
 		if (list_empty(&ls->ls_requestqueue)) {
-			mutex_unlock(&ls->ls_requestqueue_mutex);
+			clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
 			error = 0;
 			break;
 		}
-		e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
-		mutex_unlock(&ls->ls_requestqueue_mutex);
+		e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
 
 		ms = &e->request;
 
@@ -93,41 +88,23 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 			  e->recover_seq);
 
 		dlm_receive_message_saved(ls, &e->request, e->recover_seq);
-
-		mutex_lock(&ls->ls_requestqueue_mutex);
 		list_del(&e->list);
-		if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
-			wake_up(&ls->ls_requestqueue_wait);
 		kfree(e);
 
 		if (dlm_locking_stopped(ls)) {
 			log_debug(ls, "process_requestqueue abort running");
-			mutex_unlock(&ls->ls_requestqueue_mutex);
 			error = -EINTR;
 			break;
 		}
+		write_unlock(&ls->ls_requestqueue_lock);
 		schedule();
+		write_lock(&ls->ls_requestqueue_lock);
 	}
+	write_unlock(&ls->ls_requestqueue_lock);
 
 	return error;
 }
 
-/*
- * After recovery is done, locking is resumed and dlm_recoverd takes all the
- * saved requests and processes them as they would have been by dlm_recv.  At
- * the same time, dlm_recv will start receiving new requests from remote nodes.
- * We want to delay dlm_recv processing new requests until dlm_recoverd has
- * finished processing the old saved requests.  We don't check for locking
- * stopped here because dlm_ls_stop won't stop locking until it's suspended us
- * (dlm_recv).
- */
-
-void dlm_wait_requestqueue(struct dlm_ls *ls)
-{
-	wait_event(ls->ls_requestqueue_wait,
-		   atomic_read(&ls->ls_requestqueue_cnt) == 0);
-}
-
 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
 {
 	__le32 type = ms->m_type;
@@ -158,17 +135,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	struct rq_entry *e, *safe;
 
-	mutex_lock(&ls->ls_requestqueue_mutex);
+	write_lock(&ls->ls_requestqueue_lock);
 	list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
 		ms =  &e->request;
 
 		if (purge_request(ls, ms, e->nodeid)) {
 			list_del(&e->list);
-			if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
-				wake_up(&ls->ls_requestqueue_wait);
 			kfree(e);
 		}
 	}
-	mutex_unlock(&ls->ls_requestqueue_mutex);
+	write_unlock(&ls->ls_requestqueue_lock);
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 12/15] dlm: ls_recv_active semaphore to rwlock
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (10 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 11/15] dlm: make requestqueue handling non sleepable Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 13/15] dlm: remove schedule in dlm receive path Alexander Aring
                   ` (2 subsequent siblings)
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch converts the ls_recv_active semaphore to a rwlock to not
sleep during dlm message processing.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/dlm_internal.h | 2 +-
 fs/dlm/lock.c         | 4 ++--
 fs/dlm/lockspace.c    | 2 +-
 fs/dlm/member.c       | 4 ++--
 fs/dlm/recoverd.c     | 4 ++--
 5 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 61820b8c47a7..269c12e0824f 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -653,7 +653,7 @@ struct dlm_ls {
 	uint64_t		ls_recover_seq;
 	struct dlm_recover	*ls_recover_args;
 	struct rw_semaphore	ls_in_recovery;	/* block local requests */
-	struct rw_semaphore	ls_recv_active;	/* block dlm_recv */
+	rwlock_t		ls_recv_active;	/* block dlm_recv */
 	struct list_head	ls_requestqueue;/* queue remote requests */
 	rwlock_t		ls_requestqueue_lock;
 	struct dlm_rcom		*ls_recover_buf;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 98d9c5a4be00..2f53fdfe262a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4837,7 +4837,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
 	   be inactive (in this ls) before transitioning to recovery mode */
 
-	down_read(&ls->ls_recv_active);
+	read_lock(&ls->ls_recv_active);
 	if (hd->h_cmd == DLM_MSG)
 		dlm_receive_message(ls, &p->message, nodeid);
 	else if (hd->h_cmd == DLM_RCOM)
@@ -4845,7 +4845,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	else
 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
-	up_read(&ls->ls_recv_active);
+	read_unlock(&ls->ls_recv_active);
 
 	dlm_put_lockspace(ls);
 }
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 757e473bc619..c021bf684fbc 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -552,7 +552,7 @@ static int new_lockspace(const char *name, const char *cluster,
 	ls->ls_recover_seq = get_random_u64();
 	ls->ls_recover_args = NULL;
 	init_rwsem(&ls->ls_in_recovery);
-	init_rwsem(&ls->ls_recv_active);
+	rwlock_init(&ls->ls_recv_active);
 	INIT_LIST_HEAD(&ls->ls_requestqueue);
 	rwlock_init(&ls->ls_requestqueue_lock);
 	spin_lock_init(&ls->ls_clear_proc_locks);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 707cebcdc533..ac1b555af9d6 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * message to the requestqueue without races.
 	 */
 
-	down_write(&ls->ls_recv_active);
+	write_lock(&ls->ls_recv_active);
 
 	/*
 	 * Abort any recovery that's in progress (see RECOVER_STOP,
@@ -654,7 +654,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * requestqueue for later.
 	 */
 
-	up_write(&ls->ls_recv_active);
+	write_unlock(&ls->ls_recv_active);
 
 	/*
 	 * This in_recovery lock does two things:
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 0b1a62167798..a11ae1da2f60 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -103,7 +103,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 {
 	int error = -EINTR;
 
-	down_write(&ls->ls_recv_active);
+	write_lock(&ls->ls_recv_active);
 
 	spin_lock(&ls->ls_recover_lock);
 	if (ls->ls_recover_seq == seq) {
@@ -115,7 +115,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 	}
 	spin_unlock(&ls->ls_recover_lock);
 
-	up_write(&ls->ls_recv_active);
+	write_unlock(&ls->ls_recv_active);
 	return error;
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 13/15] dlm: remove schedule in dlm receive path
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (11 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 12/15] dlm: ls_recv_active semaphore to rwlock Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 14/15] dlm: convert message parsing locks to disable bh Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 15/15] dlm: do dlm message processing in softirq context Alexander Aring
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch removes an explicit schedule() call in the receive path of
dlm message processing. The goal in DLM is to not trigger any additional
scheduling while processing DLM messages. This schedule() is directly
called inside the dlm message processing path. As soon we handle the dlm
messaging processing in softirq context we cannot call schedule() in
this context anymore. This patch prepares for this transition by simple
removing this schedule() call.

Handling schedule() in a DLM lock request burst can result in that we
trigger more and more lock request without making the lock requests done.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lock.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 2f53fdfe262a..e4cec14f9973 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -2543,7 +2543,6 @@ static void process_lookup_list(struct dlm_rsb *r)
 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
 		list_del_init(&lkb->lkb_rsb_lookup);
 		_request_lock(r, lkb);
-		schedule();
 	}
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 14/15] dlm: convert message parsing locks to disable bh
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (12 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 13/15] dlm: remove schedule in dlm receive path Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  2024-04-02 19:18 ` [PATCHv4 dlm/next 15/15] dlm: do dlm message processing in softirq context Alexander Aring
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch converts all spinlocks involved in message parsing to it's
_bh version. The reason to do that is to convert the message parsing
into softirq context and we need to prevent that those locks can be
interrupted by a softirq if those are held.

All locks can be also hold by a user context e.g. misc_device of dlm
user space interface to handle lock request that is coming from the user
space.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/ast.c          |  12 +--
 fs/dlm/debug_fs.c     |  28 +++---
 fs/dlm/dir.c          |  24 ++---
 fs/dlm/lock.c         | 206 ++++++++++++++++++++++++------------------
 fs/dlm/lock.h         |   4 +-
 fs/dlm/lockspace.c    |  51 +++++------
 fs/dlm/lowcomms.c     |  16 ++--
 fs/dlm/member.c       |  22 ++---
 fs/dlm/midcomms.c     |  40 ++++----
 fs/dlm/rcom.c         |  26 +++---
 fs/dlm/recover.c      |  52 +++++------
 fs/dlm/recoverd.c     |  20 ++--
 fs/dlm/requestqueue.c |  12 +--
 fs/dlm/user.c         |  32 +++----
 14 files changed, 287 insertions(+), 258 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 03879c94fadb..59711486d801 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -142,12 +142,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 		cb->astparam = lkb->lkb_astparam;
 		INIT_WORK(&cb->work, dlm_callback_work);
 
-		spin_lock(&ls->ls_cb_lock);
+		spin_lock_bh(&ls->ls_cb_lock);
 		if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
 			list_add(&cb->list, &ls->ls_cb_delay);
 		else
 			queue_work(ls->ls_callback_wq, &cb->work);
-		spin_unlock(&ls->ls_cb_lock);
+		spin_unlock_bh(&ls->ls_cb_lock);
 		break;
 	case DLM_ENQUEUE_CALLBACK_SUCCESS:
 		break;
@@ -179,9 +179,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
 void dlm_callback_suspend(struct dlm_ls *ls)
 {
 	if (ls->ls_callback_wq) {
-		spin_lock(&ls->ls_cb_lock);
+		spin_lock_bh(&ls->ls_cb_lock);
 		set_bit(LSFL_CB_DELAY, &ls->ls_flags);
-		spin_unlock(&ls->ls_cb_lock);
+		spin_unlock_bh(&ls->ls_cb_lock);
 
 		flush_workqueue(ls->ls_callback_wq);
 	}
@@ -199,7 +199,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 		return;
 
 more:
-	spin_lock(&ls->ls_cb_lock);
+	spin_lock_bh(&ls->ls_cb_lock);
 	list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
 		list_del(&cb->list);
 		queue_work(ls->ls_callback_wq, &cb->work);
@@ -210,7 +210,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 	empty = list_empty(&ls->ls_cb_delay);
 	if (empty)
 		clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
-	spin_unlock(&ls->ls_cb_lock);
+	spin_unlock_bh(&ls->ls_cb_lock);
 
 	sum += count;
 	if (!empty) {
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 487dcf05d076..cba5514688ee 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -452,7 +452,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 
 	tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	if (!RB_EMPTY_ROOT(tree)) {
 		for (node = rb_first(tree); node; node = rb_next(node)) {
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
@@ -460,12 +460,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 				dlm_hold_rsb(r);
 				ri->rsb = r;
 				ri->bucket = bucket;
-				spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+				spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 				return ri;
 			}
 		}
 	}
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 
 	/*
 	 * move to the first rsb in the next non-empty bucket
@@ -484,18 +484,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			node = rb_first(tree);
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	}
 }
 
@@ -516,7 +516,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 	 * move to the next rsb in the same bucket
 	 */
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
 	next = rb_next(&rp->res_hashnode);
 
@@ -524,12 +524,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 		dlm_put_rsb(rp);
 		++*pos;
 		return ri;
 	}
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	dlm_put_rsb(rp);
 
 	/*
@@ -550,18 +550,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
-		spin_lock(&ls->ls_rsbtbl[bucket].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 		if (!RB_EMPTY_ROOT(tree)) {
 			next = rb_first(tree);
 			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
-			spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 			*pos = n;
 			return ri;
 		}
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	}
 }
 
@@ -743,7 +743,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 		goto out;
 	}
 
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 	memset(debug_buf, 0, sizeof(debug_buf));
 
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
@@ -754,7 +754,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 			break;
 		pos += ret;
 	}
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 	dlm_unlock_recovery(ls);
 
 	rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 0dc8a1d9e411..ff3a51c759b5 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -204,12 +204,12 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
 	hash = jhash(name, len, 0);
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
 	if (rv)
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
 					 name, len, &r);
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 
 	if (!rv)
 		return r;
@@ -245,7 +245,7 @@ static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
 {
 	struct dlm_dir_dump *dd, *safe;
 
-	write_lock(&ls->ls_dir_dump_lock);
+	write_lock_bh(&ls->ls_dir_dump_lock);
 	list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
 		if (dd->nodeid_init == nodeid) {
 			log_error(ls, "drop dump seq %llu",
@@ -254,21 +254,21 @@ static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
 			kfree(dd);
 		}
 	}
-	write_unlock(&ls->ls_dir_dump_lock);
+	write_unlock_bh(&ls->ls_dir_dump_lock);
 }
 
 static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
 {
 	struct dlm_dir_dump *iter, *dd = NULL;
 
-	read_lock(&ls->ls_dir_dump_lock);
+	read_lock_bh(&ls->ls_dir_dump_lock);
 	list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
 		if (iter->nodeid_init == nodeid) {
 			dd = iter;
 			break;
 		}
 	}
-	read_unlock(&ls->ls_dir_dump_lock);
+	read_unlock_bh(&ls->ls_dir_dump_lock);
 
 	return dd;
 }
@@ -291,9 +291,9 @@ static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
 	dd->seq_init = ls->ls_recover_seq;
 	dd->nodeid_init = nodeid;
 
-	write_lock(&ls->ls_dir_dump_lock);
+	write_lock_bh(&ls->ls_dir_dump_lock);
 	list_add(&dd->list, &ls->ls_dir_dump_list);
-	write_unlock(&ls->ls_dir_dump_lock);
+	write_unlock_bh(&ls->ls_dir_dump_lock);
 
 	return dd;
 }
@@ -311,7 +311,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 	struct dlm_dir_dump *dd;
 	__be16 be_namelen;
 
-	read_lock(&ls->ls_masters_lock);
+	read_lock_bh(&ls->ls_masters_lock);
 
 	if (inlen > 1) {
 		dd = lookup_dir_dump(ls, nodeid);
@@ -397,12 +397,12 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 		log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
 			  nodeid, dd->sent_res, dd->sent_msg);
 
-		write_lock(&ls->ls_dir_dump_lock);
+		write_lock_bh(&ls->ls_dir_dump_lock);
 		list_del_init(&dd->list);
-		write_unlock(&ls->ls_dir_dump_lock);
+		write_unlock_bh(&ls->ls_dir_dump_lock);
 		kfree(dd);
 	}
  out:
-	read_unlock(&ls->ls_masters_lock);
+	read_unlock_bh(&ls->ls_masters_lock);
 }
 
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e4cec14f9973..4ff4ef2a5f87 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -333,6 +333,36 @@ void dlm_hold_rsb(struct dlm_rsb *r)
 	hold_rsb(r);
 }
 
+/* TODO move this to lib/refcount.c */
+static __must_check bool
+dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+__cond_acquires(lock)
+{
+	if (refcount_dec_not_one(r))
+		return false;
+
+	spin_lock_bh(lock);
+	if (!refcount_dec_and_test(r)) {
+		spin_unlock_bh(lock);
+		return false;
+	}
+
+	return true;
+}
+
+/* TODO move this to include/linux/kref.h */
+static inline int dlm_kref_put_lock_bh(struct kref *kref,
+				       void (*release)(struct kref *kref),
+				       spinlock_t *lock)
+{
+	if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+		release(kref);
+		return 1;
+	}
+
+	return 0;
+}
+
 /* When all references to the rsb are gone it's transferred to
    the tossed list for later disposal. */
 
@@ -342,10 +372,10 @@ static void put_rsb(struct dlm_rsb *r)
 	uint32_t bucket = r->res_bucket;
 	int rv;
 
-	rv = kref_put_lock(&r->res_ref, toss_rsb,
-			   &ls->ls_rsbtbl[bucket].lock);
+	rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
+				  &ls->ls_rsbtbl[bucket].lock);
 	if (rv)
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 }
 
 void dlm_put_rsb(struct dlm_rsb *r)
@@ -358,17 +388,17 @@ static int pre_rsb_struct(struct dlm_ls *ls)
 	struct dlm_rsb *r1, *r2;
 	int count = 0;
 
-	spin_lock(&ls->ls_new_rsb_spin);
+	spin_lock_bh(&ls->ls_new_rsb_spin);
 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
-		spin_unlock(&ls->ls_new_rsb_spin);
+		spin_unlock_bh(&ls->ls_new_rsb_spin);
 		return 0;
 	}
-	spin_unlock(&ls->ls_new_rsb_spin);
+	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
 	r1 = dlm_allocate_rsb(ls);
 	r2 = dlm_allocate_rsb(ls);
 
-	spin_lock(&ls->ls_new_rsb_spin);
+	spin_lock_bh(&ls->ls_new_rsb_spin);
 	if (r1) {
 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
 		ls->ls_new_rsb_count++;
@@ -378,7 +408,7 @@ static int pre_rsb_struct(struct dlm_ls *ls)
 		ls->ls_new_rsb_count++;
 	}
 	count = ls->ls_new_rsb_count;
-	spin_unlock(&ls->ls_new_rsb_spin);
+	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
 	if (!count)
 		return -ENOMEM;
@@ -395,10 +425,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	struct dlm_rsb *r;
 	int count;
 
-	spin_lock(&ls->ls_new_rsb_spin);
+	spin_lock_bh(&ls->ls_new_rsb_spin);
 	if (list_empty(&ls->ls_new_rsb)) {
 		count = ls->ls_new_rsb_count;
-		spin_unlock(&ls->ls_new_rsb_spin);
+		spin_unlock_bh(&ls->ls_new_rsb_spin);
 		log_debug(ls, "find_rsb retry %d %d %s",
 			  count, dlm_config.ci_new_rsb_count,
 			  (const char *)name);
@@ -410,7 +440,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
 	/* Convert the empty list_head to a NULL rb_node for tree usage: */
 	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
-	spin_unlock(&ls->ls_new_rsb_spin);
+	spin_unlock_bh(&ls->ls_new_rsb_spin);
 
 	r->res_ls = ls;
 	r->res_length = len;
@@ -585,7 +615,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 			goto out;
 	}
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -655,7 +685,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 	if (error)
@@ -704,7 +734,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
  out_add:
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
  out:
 	*r_ret = r;
 	return error;
@@ -729,7 +759,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 	if (error < 0)
 		goto out;
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
@@ -787,7 +817,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 	if (error)
@@ -802,7 +832,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
  out_unlock:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
  out:
 	*r_ret = r;
 	return error;
@@ -1019,7 +1049,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error < 0)
 		return error;
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error) {
 		/* because the rsb is active, we need to lock_rsb before
@@ -1027,7 +1057,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		 */
 
 		hold_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		lock_rsb(r);
 
 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1053,14 +1083,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 
 	r->res_toss_time = jiffies;
 	/* the rsb was inactive (on toss list) */
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 
 	return 0;
 
  not_found:
 	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 	if (error)
@@ -1078,7 +1108,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 	if (error) {
 		/* should never happen */
 		dlm_free_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 
@@ -1086,7 +1116,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
 		*result = DLM_LU_ADD;
 	*r_nodeid = from_nodeid;
  out_unlock:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 	return error;
 }
 
@@ -1097,13 +1127,13 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock(&ls->ls_rsbtbl[i].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (r->res_hash == hash)
 				dlm_dump_rsb(r);
 		}
-		spin_unlock(&ls->ls_rsbtbl[i].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
 }
 
@@ -1116,7 +1146,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (!error)
 		goto out_dump;
@@ -1127,7 +1157,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
  out_dump:
 	dlm_dump_rsb(r);
  out:
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 }
 
 static void toss_rsb(struct kref *kref)
@@ -1208,11 +1238,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 
-	spin_lock(&ls->ls_lkbidr_spin);
+	spin_lock_bh(&ls->ls_lkbidr_spin);
 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
 	if (rv >= 0)
 		lkb->lkb_id = rv;
-	spin_unlock(&ls->ls_lkbidr_spin);
+	spin_unlock_bh(&ls->ls_lkbidr_spin);
 
 	if (rv < 0) {
 		log_error(ls, "create_lkb idr error %d", rv);
@@ -1233,11 +1263,11 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 {
 	struct dlm_lkb *lkb;
 
-	spin_lock(&ls->ls_lkbidr_spin);
+	spin_lock_bh(&ls->ls_lkbidr_spin);
 	lkb = idr_find(&ls->ls_lkbidr, lkid);
 	if (lkb)
 		kref_get(&lkb->lkb_ref);
-	spin_unlock(&ls->ls_lkbidr_spin);
+	spin_unlock_bh(&ls->ls_lkbidr_spin);
 
 	*lkb_ret = lkb;
 	return lkb ? 0 : -ENOENT;
@@ -1261,11 +1291,11 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 	uint32_t lkid = lkb->lkb_id;
 	int rv;
 
-	rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
-			   &ls->ls_lkbidr_spin);
+	rv = dlm_kref_put_lock_bh(&lkb->lkb_ref, kill_lkb,
+				  &ls->ls_lkbidr_spin);
 	if (rv) {
 		idr_remove(&ls->ls_lkbidr, lkid);
-		spin_unlock(&ls->ls_lkbidr_spin);
+		spin_unlock_bh(&ls->ls_lkbidr_spin);
 
 		detach_lkb(lkb);
 
@@ -1406,7 +1436,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error = 0;
 
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 
 	if (is_overlap_unlock(lkb) ||
 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
@@ -1449,7 +1479,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1549,9 +1579,9 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error;
 
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 	error = _remove_from_waiters(lkb, mstype, NULL);
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1569,13 +1599,13 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
 	int error;
 
 	if (!local)
-		spin_lock(&ls->ls_waiters_lock);
+		spin_lock_bh(&ls->ls_waiters_lock);
 	else
 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
 			     !dlm_locking_stopped(ls));
 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
 	if (!local)
-		spin_unlock(&ls->ls_waiters_lock);
+		spin_unlock_bh(&ls->ls_waiters_lock);
 	return error;
 }
 
@@ -1591,10 +1621,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		return;
 	}
 
@@ -1651,7 +1681,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
 	else
 		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
-	spin_unlock(&ls->ls_rsbtbl[b].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 
 	/*
 	 * While searching for rsb's to free, we found some that require
@@ -1666,16 +1696,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 		name = ls->ls_remove_names[i];
 		len = ls->ls_remove_lens[i];
 
-		spin_lock(&ls->ls_rsbtbl[b].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 		if (rv) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_debug(ls, "remove_name not toss %s", name);
 			continue;
 		}
 
 		if (r->res_master_nodeid != our_nodeid) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_debug(ls, "remove_name master %d dir %d our %d %s",
 				  r->res_master_nodeid, r->res_dir_nodeid,
 				  our_nodeid, name);
@@ -1684,7 +1714,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (r->res_dir_nodeid == our_nodeid) {
 			/* should never happen */
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_error(ls, "remove_name dir %d master %d our %d %s",
 				  r->res_dir_nodeid, r->res_master_nodeid,
 				  our_nodeid, name);
@@ -1693,21 +1723,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
 
 		if (!time_after_eq(jiffies, r->res_toss_time +
 				   dlm_config.ci_toss_secs * HZ)) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_debug(ls, "remove_name toss_time %lu now %lu %s",
 				  r->res_toss_time, jiffies, name);
 			continue;
 		}
 
 		if (!kref_put(&r->res_ref, kill_rsb)) {
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			log_error(ls, "remove_name in use %s", name);
 			continue;
 		}
 
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 		send_remove(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 
 		dlm_free_rsb(r);
 	}
@@ -4171,7 +4201,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 	hash = jhash(name, len, 0);
 	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	spin_lock(&ls->ls_rsbtbl[b].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[b].lock);
 
 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 	if (rv) {
@@ -4181,7 +4211,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			/* should not happen */
 			log_error(ls, "receive_remove from %d not found %s",
 				  from_nodeid, name);
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			return;
 		}
 		if (r->res_master_nodeid != from_nodeid) {
@@ -4189,14 +4219,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 			log_error(ls, "receive_remove keep from %d master %d",
 				  from_nodeid, r->res_master_nodeid);
 			dlm_print_rsb(r);
-			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 			return;
 		}
 
 		log_debug(ls, "receive_remove from %d master %d first %x %s",
 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
 			  name);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		return;
 	}
 
@@ -4204,19 +4234,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 		log_error(ls, "receive_remove toss from %d master %d",
 			  from_nodeid, r->res_master_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		return;
 	}
 
 	if (kref_put(&r->res_ref, kill_rsb)) {
 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 		dlm_free_rsb(r);
 	} else {
 		log_error(ls, "receive_remove from %d rsb ref error",
 			  from_nodeid);
 		dlm_print_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
 	}
 }
 
@@ -4752,20 +4782,20 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 				int nodeid)
 {
 try_again:
-	read_lock(&ls->ls_requestqueue_lock);
+	read_lock_bh(&ls->ls_requestqueue_lock);
 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
 		/* If we were a member of this lockspace, left, and rejoined,
 		   other nodes may still be sending us messages from the
 		   lockspace generation before we left. */
 		if (WARN_ON_ONCE(!ls->ls_generation)) {
-			read_unlock(&ls->ls_requestqueue_lock);
+			read_unlock_bh(&ls->ls_requestqueue_lock);
 			log_limit(ls, "receive %d from %d ignore old gen",
 				  le32_to_cpu(ms->m_type), nodeid);
 			return;
 		}
 
-		read_unlock(&ls->ls_requestqueue_lock);
-		write_lock(&ls->ls_requestqueue_lock);
+		read_unlock_bh(&ls->ls_requestqueue_lock);
+		write_lock_bh(&ls->ls_requestqueue_lock);
 		/* recheck because we hold writelock now */
 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
 			write_unlock_bh(&ls->ls_requestqueue_lock);
@@ -4773,10 +4803,10 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
 		}
 
 		dlm_add_requestqueue(ls, nodeid, ms);
-		write_unlock(&ls->ls_requestqueue_lock);
+		write_unlock_bh(&ls->ls_requestqueue_lock);
 	} else {
 		_receive_message(ls, ms, 0);
-		read_unlock(&ls->ls_requestqueue_lock);
+		read_unlock_bh(&ls->ls_requestqueue_lock);
 	}
 }
 
@@ -4836,7 +4866,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
 	   be inactive (in this ls) before transitioning to recovery mode */
 
-	read_lock(&ls->ls_recv_active);
+	read_lock_bh(&ls->ls_recv_active);
 	if (hd->h_cmd == DLM_MSG)
 		dlm_receive_message(ls, &p->message, nodeid);
 	else if (hd->h_cmd == DLM_RCOM)
@@ -4844,7 +4874,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
 	else
 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
-	read_unlock(&ls->ls_recv_active);
+	read_unlock_bh(&ls->ls_recv_active);
 
 	dlm_put_lockspace(ls);
 }
@@ -5004,7 +5034,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb = NULL, *iter;
 
-	spin_lock(&ls->ls_waiters_lock);
+	spin_lock_bh(&ls->ls_waiters_lock);
 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
 			hold_lkb(iter);
@@ -5012,7 +5042,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 			break;
 		}
 	}
-	spin_unlock(&ls->ls_waiters_lock);
+	spin_unlock_bh(&ls->ls_waiters_lock);
 
 	return lkb;
 }
@@ -5112,9 +5142,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		}
 
 		/* Forcibly remove from waiters list */
-		spin_lock(&ls->ls_waiters_lock);
+		spin_lock_bh(&ls->ls_waiters_lock);
 		list_del_init(&lkb->lkb_wait_reply);
-		spin_unlock(&ls->ls_waiters_lock);
+		spin_unlock_bh(&ls->ls_waiters_lock);
 
 		/*
 		 * The lkb is now clear of all prior waiters state and can be
@@ -5284,7 +5314,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 	struct rb_node *n;
 	struct dlm_rsb *r;
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 
@@ -5295,10 +5325,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 			continue;
 		}
 		hold_rsb(r);
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 		return r;
 	}
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+	spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
 	return NULL;
 }
 
@@ -5642,10 +5672,10 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
 	}
 
 	/* add this new lkb to the per-process list of locks */
-	spin_lock(&ua->proc->locks_spin);
+	spin_lock_bh(&ua->proc->locks_spin);
 	hold_lkb(lkb);
 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
-	spin_unlock(&ua->proc->locks_spin);
+	spin_unlock_bh(&ua->proc->locks_spin);
 	do_put = false;
  out_put:
 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
@@ -5775,9 +5805,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 	 * for the proc locks list.
 	 */
 
-	spin_lock(&ua->proc->locks_spin);
+	spin_lock_bh(&ua->proc->locks_spin);
 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
-	spin_unlock(&ua->proc->locks_spin);
+	spin_unlock_bh(&ua->proc->locks_spin);
  out:
 	kfree(ua_tmp);
 	return rv;
@@ -5821,11 +5851,11 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 	if (error)
 		goto out_put;
 
-	spin_lock(&ua->proc->locks_spin);
+	spin_lock_bh(&ua->proc->locks_spin);
 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
 	if (!list_empty(&lkb->lkb_ownqueue))
 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
-	spin_unlock(&ua->proc->locks_spin);
+	spin_unlock_bh(&ua->proc->locks_spin);
  out_put:
 	trace_dlm_unlock_end(ls, lkb, flags, error);
 	dlm_put_lkb(lkb);
@@ -5976,7 +6006,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
 {
 	struct dlm_lkb *lkb = NULL;
 
-	spin_lock(&ls->ls_clear_proc_locks);
+	spin_lock_bh(&ls->ls_clear_proc_locks);
 	if (list_empty(&proc->locks))
 		goto out;
 
@@ -5988,7 +6018,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
 	else
 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
  out:
-	spin_unlock(&ls->ls_clear_proc_locks);
+	spin_unlock_bh(&ls->ls_clear_proc_locks);
 	return lkb;
 }
 
@@ -6025,7 +6055,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 		dlm_put_lkb(lkb);
 	}
 
-	spin_lock(&ls->ls_clear_proc_locks);
+	spin_lock_bh(&ls->ls_clear_proc_locks);
 
 	/* in-progress unlocks */
 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
@@ -6039,7 +6069,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 		dlm_free_cb(cb);
 	}
 
-	spin_unlock(&ls->ls_clear_proc_locks);
+	spin_unlock_bh(&ls->ls_clear_proc_locks);
 	dlm_unlock_recovery(ls);
 }
 
@@ -6050,13 +6080,13 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 
 	while (1) {
 		lkb = NULL;
-		spin_lock(&proc->locks_spin);
+		spin_lock_bh(&proc->locks_spin);
 		if (!list_empty(&proc->locks)) {
 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
 					 lkb_ownqueue);
 			list_del_init(&lkb->lkb_ownqueue);
 		}
-		spin_unlock(&proc->locks_spin);
+		spin_unlock_bh(&proc->locks_spin);
 
 		if (!lkb)
 			break;
@@ -6066,20 +6096,20 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 		dlm_put_lkb(lkb); /* ref from proc->locks list */
 	}
 
-	spin_lock(&proc->locks_spin);
+	spin_lock_bh(&proc->locks_spin);
 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
 		list_del_init(&lkb->lkb_ownqueue);
 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
 		dlm_put_lkb(lkb);
 	}
-	spin_unlock(&proc->locks_spin);
+	spin_unlock_bh(&proc->locks_spin);
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
 		list_del(&cb->list);
 		dlm_free_cb(cb);
 	}
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 }
 
 /* pid of 0 means purge all orphans */
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 0f6b2700c0da..45a74869810a 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -69,12 +69,12 @@ static inline int is_master(struct dlm_rsb *r)
 
 static inline void lock_rsb(struct dlm_rsb *r)
 {
-	spin_lock(&r->res_lock);
+	spin_lock_bh(&r->res_lock);
 }
 
 static inline void unlock_rsb(struct dlm_rsb *r)
 {
-	spin_unlock(&r->res_lock);
+	spin_unlock_bh(&r->res_lock);
 }
 
 #endif
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index c021bf684fbc..c3681a50decb 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -251,15 +251,15 @@ static struct dlm_ls *find_ls_to_scan(void)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (time_after_eq(jiffies, ls->ls_scan_time +
 					    dlm_config.ci_scan_secs * HZ)) {
-			spin_unlock(&lslist_lock);
+			spin_unlock_bh(&lslist_lock);
 			return ls;
 		}
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return NULL;
 }
 
@@ -306,7 +306,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (ls->ls_global_id == id) {
@@ -316,7 +316,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 	}
 	ls = NULL;
  out:
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return ls;
 }
 
@@ -324,7 +324,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (ls->ls_local_handle == lockspace) {
 			atomic_inc(&ls->ls_count);
@@ -333,7 +333,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 	}
 	ls = NULL;
  out:
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return ls;
 }
 
@@ -341,7 +341,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
 {
 	struct dlm_ls *ls;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (ls->ls_device.minor == minor) {
 			atomic_inc(&ls->ls_count);
@@ -350,7 +350,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
 	}
 	ls = NULL;
  out:
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	return ls;
 }
 
@@ -365,15 +365,15 @@ static void remove_lockspace(struct dlm_ls *ls)
 retry:
 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	if (atomic_read(&ls->ls_count) != 0) {
-		spin_unlock(&lslist_lock);
+		spin_unlock_bh(&lslist_lock);
 		goto retry;
 	}
 
 	WARN_ON(ls->ls_create_count != 0);
 	list_del(&ls->ls_list);
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 }
 
 static int threads_start(void)
@@ -448,7 +448,7 @@ static int new_lockspace(const char *name, const char *cluster,
 
 	error = 0;
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		WARN_ON(ls->ls_create_count <= 0);
 		if (ls->ls_namelen != namelen)
@@ -464,7 +464,7 @@ static int new_lockspace(const char *name, const char *cluster,
 		error = 1;
 		break;
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (error)
 		goto out;
@@ -583,10 +583,10 @@ static int new_lockspace(const char *name, const char *cluster,
 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
 	rwlock_init(&ls->ls_dir_dump_lock);
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	ls->ls_create_count = 1;
 	list_add(&ls->ls_list, &lslist);
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (flags & DLM_LSFL_FS) {
 		error = dlm_callback_start(ls);
@@ -655,9 +655,9 @@ static int new_lockspace(const char *name, const char *cluster,
  out_callback:
 	dlm_callback_stop(ls);
  out_delist:
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_del(&ls->ls_list);
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 	idr_destroy(&ls->ls_recover_idr);
 	kfree(ls->ls_recover_buf);
  out_lkbidr:
@@ -756,7 +756,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 {
 	int rv;
 
-	spin_lock(&ls->ls_lkbidr_spin);
+	spin_lock_bh(&ls->ls_lkbidr_spin);
 	if (force == 0) {
 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 	} else if (force == 1) {
@@ -764,7 +764,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 	} else {
 		rv = 0;
 	}
-	spin_unlock(&ls->ls_lkbidr_spin);
+	spin_unlock_bh(&ls->ls_lkbidr_spin);
 	return rv;
 }
 
@@ -776,7 +776,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	busy = lockspace_busy(ls, force);
 
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	if (ls->ls_create_count == 1) {
 		if (busy) {
 			rv = -EBUSY;
@@ -790,7 +790,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	} else {
 		rv = -EINVAL;
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (rv) {
 		log_debug(ls, "release_lockspace no remove %d", rv);
@@ -918,20 +918,19 @@ void dlm_stop_lockspaces(void)
 
  restart:
 	count = 0;
-	spin_lock(&lslist_lock);
+	spin_lock_bh(&lslist_lock);
 	list_for_each_entry(ls, &lslist, ls_list) {
 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 			count++;
 			continue;
 		}
-		spin_unlock(&lslist_lock);
+		spin_unlock_bh(&lslist_lock);
 		log_error(ls, "no userland control daemon, stopping lockspace");
 		dlm_ls_stop(ls);
 		goto restart;
 	}
-	spin_unlock(&lslist_lock);
+	spin_unlock_bh(&lslist_lock);
 
 	if (count)
 		log_print("dlm user daemon left %d lockspaces", count);
 }
-
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index ab2cfbd2ea77..444dc858c4a4 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -867,36 +867,36 @@ static void process_dlm_messages(struct work_struct *work)
 {
 	struct processqueue_entry *pentry;
 
-	spin_lock(&processqueue_lock);
+	spin_lock_bh(&processqueue_lock);
 	pentry = list_first_entry_or_null(&processqueue,
 					  struct processqueue_entry, list);
 	if (WARN_ON_ONCE(!pentry)) {
 		process_dlm_messages_pending = false;
-		spin_unlock(&processqueue_lock);
+		spin_unlock_bh(&processqueue_lock);
 		return;
 	}
 
 	list_del(&pentry->list);
 	atomic_dec(&processqueue_count);
-	spin_unlock(&processqueue_lock);
+	spin_unlock_bh(&processqueue_lock);
 
 	for (;;) {
 		dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
 					    pentry->buflen);
 		free_processqueue_entry(pentry);
 
-		spin_lock(&processqueue_lock);
+		spin_lock_bh(&processqueue_lock);
 		pentry = list_first_entry_or_null(&processqueue,
 						  struct processqueue_entry, list);
 		if (!pentry) {
 			process_dlm_messages_pending = false;
-			spin_unlock(&processqueue_lock);
+			spin_unlock_bh(&processqueue_lock);
 			break;
 		}
 
 		list_del(&pentry->list);
 		atomic_dec(&processqueue_count);
-		spin_unlock(&processqueue_lock);
+		spin_unlock_bh(&processqueue_lock);
 	}
 }
 
@@ -966,14 +966,14 @@ static int receive_from_sock(struct connection *con, int buflen)
 	memmove(con->rx_leftover_buf, pentry->buf + ret,
 		con->rx_leftover);
 
-	spin_lock(&processqueue_lock);
+	spin_lock_bh(&processqueue_lock);
 	ret = atomic_inc_return(&processqueue_count);
 	list_add_tail(&pentry->list, &processqueue);
 	if (!process_dlm_messages_pending) {
 		process_dlm_messages_pending = true;
 		queue_work(process_workqueue, &process_work);
 	}
-	spin_unlock(&processqueue_lock);
+	spin_unlock_bh(&processqueue_lock);
 
 	if (ret > DLM_MAX_PROCESS_BUFFERS)
 		return DLM_IO_FLUSH;
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index ac1b555af9d6..6401916a97ef 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * message to the requestqueue without races.
 	 */
 
-	write_lock(&ls->ls_recv_active);
+	write_lock_bh(&ls->ls_recv_active);
 
 	/*
 	 * Abort any recovery that's in progress (see RECOVER_STOP,
@@ -638,23 +638,23 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 	 */
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 	ls->ls_recover_seq++;
 
 	/* activate requestqueue and stop processing */
-	write_lock(&ls->ls_requestqueue_lock);
+	write_lock_bh(&ls->ls_requestqueue_lock);
 	set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
-	write_unlock(&ls->ls_requestqueue_lock);
-	spin_unlock(&ls->ls_recover_lock);
+	write_unlock_bh(&ls->ls_requestqueue_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	/*
 	 * Let dlm_recv run again, now any normal messages will be saved on the
 	 * requestqueue for later.
 	 */
 
-	write_unlock(&ls->ls_recv_active);
+	write_unlock_bh(&ls->ls_recv_active);
 
 	/*
 	 * This in_recovery lock does two things:
@@ -679,13 +679,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
 
 	dlm_recoverd_suspend(ls);
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	kfree(ls->ls_slots);
 	ls->ls_slots = NULL;
 	ls->ls_num_slots = 0;
 	ls->ls_slots_size = 0;
 	ls->ls_recover_status = 0;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	dlm_recoverd_resume(ls);
 
@@ -719,12 +719,12 @@ int dlm_ls_start(struct dlm_ls *ls)
 	if (error < 0)
 		goto fail_rv;
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 
 	/* the lockspace needs to be stopped before it can be started */
 
 	if (!dlm_locking_stopped(ls)) {
-		spin_unlock(&ls->ls_recover_lock);
+		spin_unlock_bh(&ls->ls_recover_lock);
 		log_error(ls, "start ignored: lockspace running");
 		error = -EINVAL;
 		goto fail;
@@ -735,7 +735,7 @@ int dlm_ls_start(struct dlm_ls *ls)
 	rv->seq = ++ls->ls_recover_seq;
 	rv_old = ls->ls_recover_args;
 	ls->ls_recover_args = rv;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	if (rv_old) {
 		log_error(ls, "unused recovery %llx %d",
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index ed6fb9b9a582..c34f38e9ee5c 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -364,9 +364,9 @@ int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 	node->users = 0;
 	midcomms_node_reset(node);
 
-	spin_lock(&nodes_lock);
+	spin_lock_bh(&nodes_lock);
 	hlist_add_head_rcu(&node->hlist, &node_hash[r]);
-	spin_unlock(&nodes_lock);
+	spin_unlock_bh(&nodes_lock);
 
 	node->debugfs = dlm_create_debug_comms_file(nodeid, node);
 	return 0;
@@ -477,7 +477,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
 
 static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 {
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	pr_debug("receive passive fin ack from node %d with state %s\n",
 		 node->nodeid, dlm_state_str(node->state));
 
@@ -491,13 +491,13 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 		wake_up(&node->shutdown_wait);
 		break;
 	default:
-		spin_unlock(&node->state_lock);
+		spin_unlock_bh(&node->state_lock);
 		log_print("%s: unexpected state: %d",
 			  __func__, node->state);
 		WARN_ON_ONCE(1);
 		return;
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 }
 
 static void dlm_receive_buffer_3_2_trace(uint32_t seq,
@@ -534,7 +534,7 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
 	if (is_expected_seq) {
 		switch (p->header.h_cmd) {
 		case DLM_FIN:
-			spin_lock(&node->state_lock);
+			spin_lock_bh(&node->state_lock);
 			pr_debug("receive fin msg from node %d with state %s\n",
 				 node->nodeid, dlm_state_str(node->state));
 
@@ -575,13 +575,13 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
 				/* probably remove_member caught it, do nothing */
 				break;
 			default:
-				spin_unlock(&node->state_lock);
+				spin_unlock_bh(&node->state_lock);
 				log_print("%s: unexpected state: %d",
 					  __func__, node->state);
 				WARN_ON_ONCE(1);
 				return;
 			}
-			spin_unlock(&node->state_lock);
+			spin_unlock_bh(&node->state_lock);
 			break;
 		default:
 			WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
@@ -1182,7 +1182,7 @@ void dlm_midcomms_exit(void)
 
 static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
 {
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	pr_debug("receive active fin ack from node %d with state %s\n",
 		 node->nodeid, dlm_state_str(node->state));
 
@@ -1202,13 +1202,13 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
 		wake_up(&node->shutdown_wait);
 		break;
 	default:
-		spin_unlock(&node->state_lock);
+		spin_unlock_bh(&node->state_lock);
 		log_print("%s: unexpected state: %d",
 			  __func__, node->state);
 		WARN_ON_ONCE(1);
 		return;
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 }
 
 void dlm_midcomms_add_member(int nodeid)
@@ -1223,7 +1223,7 @@ void dlm_midcomms_add_member(int nodeid)
 		return;
 	}
 
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	if (!node->users) {
 		pr_debug("receive add member from node %d with state %s\n",
 			 node->nodeid, dlm_state_str(node->state));
@@ -1251,7 +1251,7 @@ void dlm_midcomms_add_member(int nodeid)
 
 	node->users++;
 	pr_debug("node %d users inc count %d\n", nodeid, node->users);
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 
 	srcu_read_unlock(&nodes_srcu, idx);
 }
@@ -1269,13 +1269,13 @@ void dlm_midcomms_remove_member(int nodeid)
 		return;
 	}
 
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	/* case of dlm_midcomms_addr() created node but
 	 * was not added before because dlm_midcomms_close()
 	 * removed the node
 	 */
 	if (!node->users) {
-		spin_unlock(&node->state_lock);
+		spin_unlock_bh(&node->state_lock);
 		srcu_read_unlock(&nodes_srcu, idx);
 		return;
 	}
@@ -1313,7 +1313,7 @@ void dlm_midcomms_remove_member(int nodeid)
 			break;
 		}
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 
 	srcu_read_unlock(&nodes_srcu, idx);
 }
@@ -1351,7 +1351,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
 		return;
 	}
 
-	spin_lock(&node->state_lock);
+	spin_lock_bh(&node->state_lock);
 	pr_debug("receive active shutdown for node %d with state %s\n",
 		 node->nodeid, dlm_state_str(node->state));
 	switch (node->state) {
@@ -1370,7 +1370,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
 		 */
 		break;
 	}
-	spin_unlock(&node->state_lock);
+	spin_unlock_bh(&node->state_lock);
 
 	if (DLM_DEBUG_FENCE_TERMINATION)
 		msleep(5000);
@@ -1441,9 +1441,9 @@ int dlm_midcomms_close(int nodeid)
 	ret = dlm_lowcomms_close(nodeid);
 	dlm_delete_debug_comms_file(node->debugfs);
 
-	spin_lock(&nodes_lock);
+	spin_lock_bh(&nodes_lock);
 	hlist_del_rcu(&node->hlist);
-	spin_unlock(&nodes_lock);
+	spin_unlock_bh(&nodes_lock);
 	srcu_read_unlock(&nodes_srcu, idx);
 
 	/* wait that all readers left until flush send queue */
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 2e3f529f3ff2..be1a71a6303a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -143,18 +143,18 @@ static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 
 static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
 {
-	spin_lock(&ls->ls_rcom_spin);
+	spin_lock_bh(&ls->ls_rcom_spin);
 	*new_seq = cpu_to_le64(++ls->ls_rcom_seq);
 	set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
-	spin_unlock(&ls->ls_rcom_spin);
+	spin_unlock_bh(&ls->ls_rcom_spin);
 }
 
 static void disallow_sync_reply(struct dlm_ls *ls)
 {
-	spin_lock(&ls->ls_rcom_spin);
+	spin_lock_bh(&ls->ls_rcom_spin);
 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 	clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
-	spin_unlock(&ls->ls_rcom_spin);
+	spin_unlock_bh(&ls->ls_rcom_spin);
 }
 
 /*
@@ -245,10 +245,10 @@ static void receive_rcom_status(struct dlm_ls *ls,
 		goto do_create;
 	}
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	status = ls->ls_recover_status;
 	num_slots = ls->ls_num_slots;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 	len += num_slots * sizeof(struct rcom_slot);
 
  do_create:
@@ -266,9 +266,9 @@ static void receive_rcom_status(struct dlm_ls *ls,
 	if (!num_slots)
 		goto do_send;
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	if (ls->ls_num_slots != num_slots) {
-		spin_unlock(&ls->ls_recover_lock);
+		spin_unlock_bh(&ls->ls_recover_lock);
 		log_debug(ls, "receive_rcom_status num_slots %d to %d",
 			  num_slots, ls->ls_num_slots);
 		rc->rc_result = 0;
@@ -277,7 +277,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
 	}
 
 	dlm_slots_copy_out(ls, rc);
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
  do_send:
 	send_rcom_stateless(msg, rc);
@@ -285,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
 
 static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
 {
-	spin_lock(&ls->ls_rcom_spin);
+	spin_lock_bh(&ls->ls_rcom_spin);
 	if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 	    le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
 		log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
@@ -301,7 +301,7 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 	wake_up(&ls->ls_wait_general);
  out:
-	spin_unlock(&ls->ls_rcom_spin);
+	spin_unlock_bh(&ls->ls_rcom_spin);
 }
 
 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
@@ -613,11 +613,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
 		break;
 	}
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	status = ls->ls_recover_status;
 	stop = dlm_recovery_stopped(ls);
 	seq = ls->ls_recover_seq;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
 		goto ignore;
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 172c6b73f37a..13bc845fa305 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -74,9 +74,9 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
 uint32_t dlm_recover_status(struct dlm_ls *ls)
 {
 	uint32_t status;
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	status = ls->ls_recover_status;
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 	return status;
 }
 
@@ -87,9 +87,9 @@ static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
 
 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
 {
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	_set_recover_status(ls, status);
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 }
 
 static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
@@ -188,13 +188,13 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
 
 		rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
 		if (!rv) {
-			spin_lock(&ls->ls_recover_lock);
+			spin_lock_bh(&ls->ls_recover_lock);
 			_set_recover_status(ls, DLM_RS_NODES_ALL);
 			ls->ls_num_slots = num_slots;
 			ls->ls_slots_size = slots_size;
 			ls->ls_slots = slots;
 			ls->ls_generation = gen;
-			spin_unlock(&ls->ls_recover_lock);
+			spin_unlock_bh(&ls->ls_recover_lock);
 		} else {
 			dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
 		}
@@ -241,9 +241,9 @@ static int recover_list_empty(struct dlm_ls *ls)
 {
 	int empty;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	empty = list_empty(&ls->ls_recover_list);
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 
 	return empty;
 }
@@ -252,23 +252,23 @@ static void recover_list_add(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	if (list_empty(&r->res_recover_list)) {
 		list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
 		ls->ls_recover_list_count++;
 		dlm_hold_rsb(r);
 	}
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 }
 
 static void recover_list_del(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	list_del_init(&r->res_recover_list);
 	ls->ls_recover_list_count--;
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 
 	dlm_put_rsb(r);
 }
@@ -277,7 +277,7 @@ static void recover_list_clear(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r, *s;
 
-	spin_lock(&ls->ls_recover_list_lock);
+	spin_lock_bh(&ls->ls_recover_list_lock);
 	list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
 		list_del_init(&r->res_recover_list);
 		r->res_recover_locks_count = 0;
@@ -290,17 +290,17 @@ static void recover_list_clear(struct dlm_ls *ls)
 			  ls->ls_recover_list_count);
 		ls->ls_recover_list_count = 0;
 	}
-	spin_unlock(&ls->ls_recover_list_lock);
+	spin_unlock_bh(&ls->ls_recover_list_lock);
 }
 
 static int recover_idr_empty(struct dlm_ls *ls)
 {
 	int empty = 1;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	if (ls->ls_recover_list_count)
 		empty = 0;
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 
 	return empty;
 }
@@ -310,7 +310,7 @@ static int recover_idr_add(struct dlm_rsb *r)
 	struct dlm_ls *ls = r->res_ls;
 	int rv;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	if (r->res_id) {
 		rv = -1;
 		goto out_unlock;
@@ -324,7 +324,7 @@ static int recover_idr_add(struct dlm_rsb *r)
 	dlm_hold_rsb(r);
 	rv = 0;
 out_unlock:
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 	return rv;
 }
 
@@ -332,11 +332,11 @@ static void recover_idr_del(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	idr_remove(&ls->ls_recover_idr, r->res_id);
 	r->res_id = 0;
 	ls->ls_recover_list_count--;
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 
 	dlm_put_rsb(r);
 }
@@ -345,9 +345,9 @@ static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
 {
 	struct dlm_rsb *r;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 	r = idr_find(&ls->ls_recover_idr, (int)id);
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 	return r;
 }
 
@@ -356,7 +356,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 	int id;
 
-	spin_lock(&ls->ls_recover_idr_lock);
+	spin_lock_bh(&ls->ls_recover_idr_lock);
 
 	idr_for_each_entry(&ls->ls_recover_idr, r, id) {
 		idr_remove(&ls->ls_recover_idr, id);
@@ -372,7 +372,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
 			  ls->ls_recover_list_count);
 		ls->ls_recover_list_count = 0;
 	}
-	spin_unlock(&ls->ls_recover_idr_lock);
+	spin_unlock_bh(&ls->ls_recover_idr_lock);
 }
 
 
@@ -887,7 +887,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		spin_lock(&ls->ls_rsbtbl[i].lock);
+		spin_lock_bh(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 			next = rb_next(n);
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -895,7 +895,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
 			dlm_free_rsb(r);
 			count++;
 		}
-		spin_unlock(&ls->ls_rsbtbl[i].lock);
+		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
 
 	if (count)
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index a11ae1da2f60..c82cc48988c6 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -26,7 +26,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 	int i, error = 0;
 
-	write_lock(&ls->ls_masters_lock);
+	write_lock_bh(&ls->ls_masters_lock);
 	if (!list_empty(&ls->ls_masters_list)) {
 		log_error(ls, "root list not empty");
 		error = -EINVAL;
@@ -46,7 +46,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 		spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
 	}
  out:
-	write_unlock(&ls->ls_masters_lock);
+	write_unlock_bh(&ls->ls_masters_lock);
 	return error;
 }
 
@@ -54,12 +54,12 @@ static void dlm_release_masters_list(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r, *safe;
 
-	write_lock(&ls->ls_masters_lock);
+	write_lock_bh(&ls->ls_masters_lock);
 	list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
 		list_del_init(&r->res_masters_list);
 		dlm_put_rsb(r);
 	}
-	write_unlock(&ls->ls_masters_lock);
+	write_unlock_bh(&ls->ls_masters_lock);
 }
 
 static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
@@ -103,9 +103,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 {
 	int error = -EINTR;
 
-	write_lock(&ls->ls_recv_active);
+	write_lock_bh(&ls->ls_recv_active);
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	if (ls->ls_recover_seq == seq) {
 		set_bit(LSFL_RUNNING, &ls->ls_flags);
 		/* unblocks processes waiting to enter the dlm */
@@ -113,9 +113,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 		error = 0;
 	}
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
-	write_unlock(&ls->ls_recv_active);
+	write_unlock_bh(&ls->ls_recv_active);
 	return error;
 }
 
@@ -349,12 +349,12 @@ static void do_ls_recovery(struct dlm_ls *ls)
 	struct dlm_recover *rv = NULL;
 	int error;
 
-	spin_lock(&ls->ls_recover_lock);
+	spin_lock_bh(&ls->ls_recover_lock);
 	rv = ls->ls_recover_args;
 	ls->ls_recover_args = NULL;
 	if (rv && ls->ls_recover_seq == rv->seq)
 		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
-	spin_unlock(&ls->ls_recover_lock);
+	spin_unlock_bh(&ls->ls_recover_lock);
 
 	if (rv) {
 		error = ls_recover(ls, rv);
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 9b646026df46..719a5243a069 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -68,7 +68,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	int error = 0;
 
-	write_lock(&ls->ls_requestqueue_lock);
+	write_lock_bh(&ls->ls_requestqueue_lock);
 	for (;;) {
 		if (list_empty(&ls->ls_requestqueue)) {
 			clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
@@ -96,11 +96,11 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 			error = -EINTR;
 			break;
 		}
-		write_unlock(&ls->ls_requestqueue_lock);
+		write_unlock_bh(&ls->ls_requestqueue_lock);
 		schedule();
-		write_lock(&ls->ls_requestqueue_lock);
+		write_lock_bh(&ls->ls_requestqueue_lock);
 	}
-	write_unlock(&ls->ls_requestqueue_lock);
+	write_unlock_bh(&ls->ls_requestqueue_lock);
 
 	return error;
 }
@@ -135,7 +135,7 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
 	struct dlm_message *ms;
 	struct rq_entry *e, *safe;
 
-	write_lock(&ls->ls_requestqueue_lock);
+	write_lock_bh(&ls->ls_requestqueue_lock);
 	list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
 		ms =  &e->request;
 
@@ -144,6 +144,6 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
 			kfree(e);
 		}
 	}
-	write_unlock(&ls->ls_requestqueue_lock);
+	write_unlock_bh(&ls->ls_requestqueue_lock);
 }
 
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index b4971ba4bdd6..3173b974e8c8 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -189,7 +189,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 		return;
 
 	ls = lkb->lkb_resource->res_ls;
-	spin_lock(&ls->ls_clear_proc_locks);
+	spin_lock_bh(&ls->ls_clear_proc_locks);
 
 	/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
 	   can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
@@ -211,7 +211,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 	if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
 		set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 
 	rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb);
 	switch (rv) {
@@ -232,23 +232,23 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 	case DLM_ENQUEUE_CALLBACK_FAILURE:
 		fallthrough;
 	default:
-		spin_unlock(&proc->asts_spin);
+		spin_unlock_bh(&proc->asts_spin);
 		WARN_ON_ONCE(1);
 		goto out;
 	}
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 
 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
 		/* N.B. spin_lock locks_spin, not asts_spin */
-		spin_lock(&proc->locks_spin);
+		spin_lock_bh(&proc->locks_spin);
 		if (!list_empty(&lkb->lkb_ownqueue)) {
 			list_del_init(&lkb->lkb_ownqueue);
 			dlm_put_lkb(lkb);
 		}
-		spin_unlock(&proc->locks_spin);
+		spin_unlock_bh(&proc->locks_spin);
 	}
  out:
-	spin_unlock(&ls->ls_clear_proc_locks);
+	spin_unlock_bh(&ls->ls_clear_proc_locks);
 }
 
 static int device_user_lock(struct dlm_user_proc *proc,
@@ -817,10 +817,10 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 	if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
 		return -EINVAL;
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 	if (list_empty(&proc->asts)) {
 		if (file->f_flags & O_NONBLOCK) {
-			spin_unlock(&proc->asts_spin);
+			spin_unlock_bh(&proc->asts_spin);
 			return -EAGAIN;
 		}
 
@@ -829,16 +829,16 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 	repeat:
 		set_current_state(TASK_INTERRUPTIBLE);
 		if (list_empty(&proc->asts) && !signal_pending(current)) {
-			spin_unlock(&proc->asts_spin);
+			spin_unlock_bh(&proc->asts_spin);
 			schedule();
-			spin_lock(&proc->asts_spin);
+			spin_lock_bh(&proc->asts_spin);
 			goto repeat;
 		}
 		set_current_state(TASK_RUNNING);
 		remove_wait_queue(&proc->wait, &wait);
 
 		if (signal_pending(current)) {
-			spin_unlock(&proc->asts_spin);
+			spin_unlock_bh(&proc->asts_spin);
 			return -ERESTARTSYS;
 		}
 	}
@@ -849,7 +849,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 
 	cb = list_first_entry(&proc->asts, struct dlm_callback, list);
 	list_del(&cb->list);
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 
 	if (cb->flags & DLM_CB_BAST) {
 		trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
@@ -874,12 +874,12 @@ static __poll_t device_poll(struct file *file, poll_table *wait)
 
 	poll_wait(file, &proc->wait, wait);
 
-	spin_lock(&proc->asts_spin);
+	spin_lock_bh(&proc->asts_spin);
 	if (!list_empty(&proc->asts)) {
-		spin_unlock(&proc->asts_spin);
+		spin_unlock_bh(&proc->asts_spin);
 		return EPOLLIN | EPOLLRDNORM;
 	}
-	spin_unlock(&proc->asts_spin);
+	spin_unlock_bh(&proc->asts_spin);
 	return 0;
 }
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCHv4 dlm/next 15/15] dlm: do dlm message processing in softirq context
  2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
                   ` (13 preceding siblings ...)
  2024-04-02 19:18 ` [PATCHv4 dlm/next 14/15] dlm: convert message parsing locks to disable bh Alexander Aring
@ 2024-04-02 19:18 ` Alexander Aring
  14 siblings, 0 replies; 16+ messages in thread
From: Alexander Aring @ 2024-04-02 19:18 UTC (permalink / raw)
  To: teigland; +Cc: gfs2, aahringo

This patch moves the dlm message processing from a ordered workqueue
context to a ordered softirq context. Later we want to call the user
defined ast/bast callbacks directly inside the dlm message processing
context instead of doing an additional context switch to the exisiting
callback workqueue. This should slightly improve the dlm message parsing
behaviour. There are two main reasons why to change to this behaviour:

1.
   Allow fewer scheduling possibilities for dlm message parsing context.
   This should deliver faster DLM user responses to ast/bast callbacks.
   Fewer interrupting of lock requests processing that might trigger a
   new lock request avoids situations that we don't finish lock
   requests. In future the DLM callback workqueue can be disabled by
   a kernel lockspace flag to signal the DLM kernel user is capable
   of exectuing the callbacks in softirq context. If this flag is set,
   the dlm processing gets rid of an additional queue_work() context
   switch that should take more advantage about the new softirq context
   because the last preemption possibility is removed from the message
   processing context.

2. Bringing the ast/callback callback to softirq context that the use is
   aware it should not block into this context. Later patches will
   introduce a per lockspace flag to signal that the user is capable to
   handling these callbacks in softirq context to solve backwards
   compatibility. Handling the callback in the receive part and not in a
   workqueue will reduce a unnecessarily context switch. Signaling the
   user that the callback context can run in a softirq context will
   force that the user of DLM will not sleep in such context and leave
   it "fast as possible" again.

Futher patches will unveil more improvements to switch to a per message
softirq parsing context. Especially if we getting DLM in a state that we
can allow concurrent message parsing.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 28 ++++++++++++++++++++--------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 444dc858c4a4..6b8078085e56 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -204,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
 static DECLARE_WORK(process_work, process_dlm_messages);
 static DEFINE_SPINLOCK(processqueue_lock);
 static bool process_dlm_messages_pending;
+static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
 static atomic_t processqueue_count;
 static LIST_HEAD(processqueue);
 
@@ -877,7 +878,8 @@ static void process_dlm_messages(struct work_struct *work)
 	}
 
 	list_del(&pentry->list);
-	atomic_dec(&processqueue_count);
+	if (atomic_dec_and_test(&processqueue_count))
+		wake_up(&processqueue_wq);
 	spin_unlock_bh(&processqueue_lock);
 
 	for (;;) {
@@ -895,7 +897,8 @@ static void process_dlm_messages(struct work_struct *work)
 		}
 
 		list_del(&pentry->list);
-		atomic_dec(&processqueue_count);
+		if (atomic_dec_and_test(&processqueue_count))
+			wake_up(&processqueue_wq);
 		spin_unlock_bh(&processqueue_lock);
 	}
 }
@@ -1511,7 +1514,20 @@ static void process_recv_sockets(struct work_struct *work)
 		/* CF_RECV_PENDING cleared */
 		break;
 	case DLM_IO_FLUSH:
-		flush_workqueue(process_workqueue);
+		/* we can't flush the process_workqueue here because a
+		 * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
+		 * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
+		 * we have a waitqueue to wait until all messages are
+		 * processed.
+		 *
+		 * This handling is only necessary to backoff the sender and
+		 * not queue all messages from the socket layer into DLM
+		 * processqueue. When DLM is capable to parse multiple messages
+		 * on an e.g. per socket basis this handling can might be
+		 * removed. Especially in a message burst we are too slow to
+		 * process messages and the queue will fill up memory.
+		 */
+		wait_event(processqueue_wq, !atomic_read(&processqueue_count));
 		fallthrough;
 	case DLM_IO_RESCHED:
 		cond_resched();
@@ -1701,11 +1717,7 @@ static int work_start(void)
 		return -ENOMEM;
 	}
 
-	/* ordered dlm message process queue,
-	 * should be converted to a tasklet
-	 */
-	process_workqueue = alloc_ordered_workqueue("dlm_process",
-						    WQ_HIGHPRI | WQ_MEM_RECLAIM);
+	process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
 	if (!process_workqueue) {
 		log_print("can't start dlm_process");
 		destroy_workqueue(io_workqueue);
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2024-04-02 19:18 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-04-02 19:17 [PATCHv4 dlm/next 00/15] dlm: bring message parsing to softirq context Alexander Aring
2024-04-02 19:17 ` [PATCHv4 dlm/next 01/15] dlm: Simplify the allocation of slab caches in dlm_lowcomms_msg_cache_create Alexander Aring
2024-04-02 19:17 ` [PATCHv4 dlm/next 02/15] dlm: remove allocation parameter in msg allocation Alexander Aring
2024-04-02 19:17 ` [PATCHv4 dlm/next 03/15] dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
2024-04-02 19:17 ` [PATCHv4 dlm/next 04/15] dlm: move root_list functionality to recover.c Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 05/15] dlm: move master dir dump to own list Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 06/15] dlm: move root_list to ls_recover() stack Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 07/15] dlm: implement directory dump context Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 08/15] dlm: drop holding waiters mutex in waiters recovery Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 09/15] dlm: convert ls_waiters_mutex to spinlock Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 10/15] dlm: convert res_lock " Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 11/15] dlm: make requestqueue handling non sleepable Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 12/15] dlm: ls_recv_active semaphore to rwlock Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 13/15] dlm: remove schedule in dlm receive path Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 14/15] dlm: convert message parsing locks to disable bh Alexander Aring
2024-04-02 19:18 ` [PATCHv4 dlm/next 15/15] dlm: do dlm message processing in softirq context Alexander Aring

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox