* [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation
@ 2023-09-08 20:46 Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 02/10] fs: dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
` (8 more replies)
0 siblings, 9 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch removes the context parameter for message allocations and
replace it by always do GFP_ATOMIC. We are preparing to process dlm
message in softirq context therefore it's necessary to switch to
GFP_ATOMIC allocation as we cannot sleep in this context. To simplify
the code overall we just drop the allocation flag and have GFP_ATOMIC
hardcoded when calling the allocation function.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 31 ++++++++++++-------------------
fs/dlm/lowcomms.c | 16 +++++++---------
fs/dlm/lowcomms.h | 5 ++---
fs/dlm/memory.c | 8 ++++----
fs/dlm/memory.h | 4 ++--
fs/dlm/midcomms.c | 24 ++++++++++--------------
fs/dlm/midcomms.h | 3 +--
fs/dlm/rcom.c | 7 +++----
8 files changed, 41 insertions(+), 57 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 652c51fbbf76..44ea65a73f3b 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3329,8 +3329,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
static int _create_message(struct dlm_ls *ls, int mb_len,
int to_nodeid, int mstype,
struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret,
- gfp_t allocation)
+ struct dlm_mhandle **mh_ret)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
@@ -3340,7 +3339,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
pass into midcomms_commit and a message buffer (mb) that we
write our data into */
- mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
+ mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
if (!mh)
return -ENOBUFS;
@@ -3362,8 +3361,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
int to_nodeid, int mstype,
struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret,
- gfp_t allocation)
+ struct dlm_mhandle **mh_ret)
{
int mb_len = sizeof(struct dlm_message);
@@ -3384,7 +3382,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
- ms_ret, mh_ret, allocation);
+ ms_ret, mh_ret);
}
/* further lowcomms enhancements or alternate implementations may make
@@ -3453,7 +3451,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
if (error)
return error;
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+ error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
if (error)
goto fail;
@@ -3513,8 +3511,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
- GFP_NOFS);
+ error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
if (error)
goto out;
@@ -3535,8 +3532,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
- GFP_NOFS);
+ error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
if (error)
goto out;
@@ -3561,8 +3557,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (error)
return error;
- error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
- GFP_NOFS);
+ error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
if (error)
goto fail;
@@ -3586,8 +3581,7 @@ static int send_remove(struct dlm_rsb *r)
to_nodeid = dlm_dir_nodeid(r);
- error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
- GFP_ATOMIC);
+ error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
if (error)
goto out;
@@ -3608,7 +3602,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+ error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
if (error)
goto out;
@@ -3650,8 +3644,7 @@ static int send_lookup_reply(struct dlm_ls *ls,
struct dlm_mhandle *mh;
int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
- error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
- GFP_NOFS);
+ error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
if (error)
goto out;
@@ -6063,7 +6056,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
int error;
error = _create_message(ls, sizeof(struct dlm_message), nodeid,
- DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
+ DLM_MSG_PURGE, &ms, &mh);
if (error)
return error;
ms->m_nodeid = cpu_to_le32(nodeid);
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 67f8dd8a05ef..db71982d709d 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1229,14 +1229,13 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
};
static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
- gfp_t allocation, char **ppc,
- void (*cb)(void *data),
+ char **ppc, void (*cb)(void *data),
void *data)
{
struct writequeue_entry *e;
struct dlm_msg *msg;
- msg = dlm_allocate_msg(allocation);
+ msg = dlm_allocate_msg();
if (!msg)
return NULL;
@@ -1261,9 +1260,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
* dlm_lowcomms_commit_msg which is a must call if success
*/
#ifndef __CHECKER__
-struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
- char **ppc, void (*cb)(void *data),
- void *data)
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
+ void (*cb)(void *data), void *data)
{
struct connection *con;
struct dlm_msg *msg;
@@ -1284,7 +1282,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
return NULL;
}
- msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
+ msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
if (!msg) {
srcu_read_unlock(&connections_srcu, idx);
return NULL;
@@ -1348,8 +1346,8 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
if (msg->retransmit)
return 1;
- msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
- GFP_ATOMIC, &ppc, NULL, NULL);
+ msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
+ NULL, NULL);
if (!msg_resend)
return -ENOMEM;
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 3e8dca66183b..8deb16f8f620 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -39,9 +39,8 @@ void dlm_lowcomms_stop(void);
void dlm_lowcomms_init(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
-struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
- char **ppc, void (*cb)(void *data),
- void *data);
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
+ void (*cb)(void *data), void *data);
void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
void dlm_lowcomms_put_msg(struct dlm_msg *msg);
int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 64f212a066cf..c0c1a83f6381 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -134,9 +134,9 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
kmem_cache_free(lkb_cache, lkb);
}
-struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation)
+struct dlm_mhandle *dlm_allocate_mhandle(void)
{
- return kmem_cache_alloc(mhandle_cache, allocation);
+ return kmem_cache_alloc(mhandle_cache, GFP_ATOMIC);
}
void dlm_free_mhandle(struct dlm_mhandle *mhandle)
@@ -154,9 +154,9 @@ void dlm_free_writequeue(struct writequeue_entry *writequeue)
kmem_cache_free(writequeue_cache, writequeue);
}
-struct dlm_msg *dlm_allocate_msg(gfp_t allocation)
+struct dlm_msg *dlm_allocate_msg(void)
{
- return kmem_cache_alloc(msg_cache, allocation);
+ return kmem_cache_alloc(msg_cache, GFP_ATOMIC);
}
void dlm_free_msg(struct dlm_msg *msg)
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index 6b29563d24f7..15198d46b42a 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -20,11 +20,11 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
void dlm_free_lkb(struct dlm_lkb *l);
char *dlm_allocate_lvb(struct dlm_ls *ls);
void dlm_free_lvb(char *l);
-struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation);
+struct dlm_mhandle *dlm_allocate_mhandle(void);
void dlm_free_mhandle(struct dlm_mhandle *mhandle);
struct writequeue_entry *dlm_allocate_writequeue(void);
void dlm_free_writequeue(struct writequeue_entry *writequeue);
-struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
+struct dlm_msg *dlm_allocate_msg(void);
void dlm_free_msg(struct dlm_msg *msg);
struct dlm_callback *dlm_allocate_cb(void);
void dlm_free_cb(struct dlm_callback *cb);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 2247ebb61be1..ea0559e2a44d 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -380,8 +380,7 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
struct dlm_msg *msg;
char *ppc;
- msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_ATOMIC, &ppc,
- NULL, NULL);
+ msg = dlm_lowcomms_new_msg(nodeid, mb_len, &ppc, NULL, NULL);
if (!msg)
return -ENOMEM;
@@ -429,7 +428,7 @@ static int dlm_send_fin(struct midcomms_node *node,
struct dlm_mhandle *mh;
char *ppc;
- mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc);
+ mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, &ppc);
if (!mh)
return -ENOMEM;
@@ -977,13 +976,13 @@ static void midcomms_new_msg_cb(void *data)
}
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
- int len, gfp_t allocation, char **ppc)
+ int len, char **ppc)
{
struct dlm_opts *opts;
struct dlm_msg *msg;
msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
- allocation, ppc, midcomms_new_msg_cb, mh);
+ ppc, midcomms_new_msg_cb, mh);
if (!msg)
return NULL;
@@ -1002,8 +1001,7 @@ static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int node
* dlm_midcomms_commit_mhandle which is a must call if success
*/
#ifndef __CHECKER__
-struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
- gfp_t allocation, char **ppc)
+struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc)
{
struct midcomms_node *node;
struct dlm_mhandle *mh;
@@ -1018,7 +1016,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
/* this is a bug, however we going on and hope it will be resolved */
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
- mh = dlm_allocate_mhandle(allocation);
+ mh = dlm_allocate_mhandle();
if (!mh)
goto err;
@@ -1029,8 +1027,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
switch (node->version) {
case DLM_VERSION_3_1:
- msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
- NULL, NULL);
+ msg = dlm_lowcomms_new_msg(nodeid, len, ppc, NULL, NULL);
if (!msg) {
dlm_free_mhandle(mh);
goto err;
@@ -1041,8 +1038,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
/* send ack back if necessary */
dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
- msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
- ppc);
+ msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, ppc);
if (!msg) {
dlm_free_mhandle(mh);
goto err;
@@ -1502,8 +1498,8 @@ int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
rd.node = node;
rd.buf = buf;
- msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
- &msgbuf, midcomms_new_rawmsg_cb, &rd);
+ msg = dlm_lowcomms_new_msg(node->nodeid, buflen, &msgbuf,
+ midcomms_new_rawmsg_cb, &rd);
if (!msg)
return -ENOMEM;
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index e7246fb3ef57..278d26fdeb2c 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -16,8 +16,7 @@ struct midcomms_node;
int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
-struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
- gfp_t allocation, char **ppc);
+struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen);
int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 3b734aed26b5..2e3f529f3ff2 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -55,7 +55,7 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_mhandle *mh;
char *mb;
- mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
+ mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
if (!mh) {
log_print("%s to %d type %d len %d ENOBUFS",
__func__, to_nodeid, type, len);
@@ -75,8 +75,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
struct dlm_msg *msg;
char *mb;
- msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
- NULL, NULL);
+ msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
if (!msg) {
log_print("create_rcom to %d type %d len %d ENOBUFS",
to_nodeid, type, len);
@@ -510,7 +509,7 @@ int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
char *mb;
int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
- mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
+ mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
if (!mh)
return -ENOBUFS;
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 02/10] fs: dlm: switch to GFP_ATOMIC in dlm allocations
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 03/10] fs: dlm: remove explicit scheduling points Alexander Aring
` (7 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch prepares to process dlm messages in softirq context. To
switch dlm to parse messages in softirq context some code parts either
runs inside the softirq context or need to switch to run while a spinlock
is held. This patch prepares to switch the allocation context to
GFP_ATOMIC for those places. It's not possible anymore to preload idr
allocations. However this is only a performance speedup and we might
switch to xarray implementation with more lockless readers paradigms.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 2 --
fs/dlm/memory.c | 6 +++---
fs/dlm/recover.c | 2 --
fs/dlm/requestqueue.c | 2 +-
4 files changed, 4 insertions(+), 8 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 44ea65a73f3b..970b8499b66f 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1208,13 +1208,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
spin_lock_init(&lkb->lkb_cb_lock);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
- idr_preload(GFP_NOFS);
spin_lock(&ls->ls_lkbidr_spin);
rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
if (rv >= 0)
lkb->lkb_id = rv;
spin_unlock(&ls->ls_lkbidr_spin);
- idr_preload_end();
if (rv < 0) {
log_error(ls, "create_lkb idr error %d", rv);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index c0c1a83f6381..f44532d9f5c8 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -84,7 +84,7 @@ char *dlm_allocate_lvb(struct dlm_ls *ls)
{
char *p;
- p = kzalloc(ls->ls_lvblen, GFP_NOFS);
+ p = kzalloc(ls->ls_lvblen, GFP_ATOMIC);
return p;
}
@@ -97,7 +97,7 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
+ r = kmem_cache_zalloc(rsb_cache, GFP_ATOMIC);
return r;
}
@@ -112,7 +112,7 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
- lkb = kmem_cache_zalloc(lkb_cache, GFP_NOFS);
+ lkb = kmem_cache_zalloc(lkb_cache, GFP_ATOMIC);
return lkb;
}
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 53917c0aa3c0..ce6dc914cb86 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -310,7 +310,6 @@ static int recover_idr_add(struct dlm_rsb *r)
struct dlm_ls *ls = r->res_ls;
int rv;
- idr_preload(GFP_NOFS);
spin_lock(&ls->ls_recover_idr_lock);
if (r->res_id) {
rv = -1;
@@ -326,7 +325,6 @@ static int recover_idr_add(struct dlm_rsb *r)
rv = 0;
out_unlock:
spin_unlock(&ls->ls_recover_idr_lock);
- idr_preload_end();
return rv;
}
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 892d6ca21e74..c05940afd063 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -37,7 +37,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
int length = le16_to_cpu(ms->m_header.h_length) -
sizeof(struct dlm_message);
- e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
+ e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC);
if (!e) {
log_print("dlm_add_requestqueue: out of memory len %d", length);
return;
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 03/10] fs: dlm: remove explicit scheduling points
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 02/10] fs: dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 04/10] fs: dlm: convert ls_waiters_mutex to spinlock Alexander Aring
` (6 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch prepares to switch some locks to a spinlock. In this case we
need to remove some explicit schedule points when a spinlock is held.
We might have less scheduling points to try to serve others, we need to
see if this still makes problems when we remove them and find other
solutions.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 2 --
fs/dlm/recover.c | 1 -
fs/dlm/requestqueue.c | 1 -
3 files changed, 4 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 970b8499b66f..61eb285c613c 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4979,7 +4979,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
log_error(ls, "invalid lkb wait_type %d %d",
lkb->lkb_wait_type, wait_type);
}
- schedule();
}
mutex_unlock(&ls->ls_waiters_mutex);
kfree(ms_local);
@@ -5218,7 +5217,6 @@ void dlm_recover_purge(struct dlm_ls *ls)
}
unlock_rsb(r);
unhold_rsb(r);
- cond_resched();
}
up_write(&ls->ls_root_sem);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index ce6dc914cb86..752002304ca9 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -543,7 +543,6 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
else
error = recover_master(r, &count, seq);
unlock_rsb(r);
- cond_resched();
total++;
if (error) {
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index c05940afd063..ef7b7c8d6907 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -106,7 +106,6 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
error = -EINTR;
break;
}
- schedule();
}
return error;
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 04/10] fs: dlm: convert ls_waiters_mutex to spinlock
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 02/10] fs: dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 03/10] fs: dlm: remove explicit scheduling points Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 05/10] fs: dlm: convert res_lock " Alexander Aring
` (5 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch converts the per dlm lockspace waiters lock from a mutex to a
spinlock.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/debug_fs.c | 4 ++--
fs/dlm/dlm_internal.h | 2 +-
fs/dlm/lock.c | 33 +++++++++++++++++----------------
fs/dlm/lockspace.c | 2 +-
4 files changed, 21 insertions(+), 20 deletions(-)
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 698d6b7a20f8..9d726971ba47 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -824,7 +824,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
mutex_lock(&debug_buf_lock);
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
memset(debug_buf, 0, sizeof(debug_buf));
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
@@ -835,7 +835,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
break;
pos += ret;
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock(&ls->ls_waiters_lock);
rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
mutex_unlock(&debug_buf_lock);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index dfc444dad329..2f32e053a9dc 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -564,7 +564,7 @@ struct dlm_ls {
struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size;
- struct mutex ls_waiters_mutex;
+ spinlock_t ls_waiters_lock;
struct list_head ls_waiters; /* lkbs needing a reply */
struct mutex ls_orphans_mutex;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 61eb285c613c..5631623ce01a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1407,7 +1407,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
int error = 0;
int wc;
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
if (is_overlap_unlock(lkb) ||
(is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
@@ -1447,7 +1447,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
log_error(ls, "addwait error %x %d flags %x %d %d %s",
lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock(&ls->ls_waiters_lock);
return error;
}
@@ -1546,14 +1546,15 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
error = _remove_from_waiters(lkb, mstype, NULL);
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock(&ls->ls_waiters_lock);
return error;
}
/* Handles situations where we might be processing a "fake" or "local" reply in
- which we can't try to take waiters_mutex again. */
+ * which we can't try to take waiters_lock again.
+ */
static int remove_from_waiters_ms(struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local)
@@ -1562,10 +1563,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
int error;
if (!local)
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
if (!local)
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock(&ls->ls_waiters_lock);
return error;
}
@@ -4395,7 +4396,7 @@ static void _receive_convert_reply(struct dlm_lkb *lkb,
if (error)
goto out;
- /* local reply can happen with waiters_mutex held */
+ /* local reply can happen with waiters_lock held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
@@ -4434,7 +4435,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb,
if (error)
goto out;
- /* local reply can happen with waiters_mutex held */
+ /* local reply can happen with waiters_lock held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
@@ -4486,7 +4487,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb,
if (error)
goto out;
- /* local reply can happen with waiters_mutex held */
+ /* local reply can happen with waiters_lock held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
@@ -4887,7 +4888,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (!ms_local)
return;
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
@@ -4980,7 +4981,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
lkb->lkb_wait_type, wait_type);
}
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock(&ls->ls_waiters_lock);
kfree(ms_local);
}
@@ -4988,7 +4989,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
{
struct dlm_lkb *lkb = NULL, *iter;
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
hold_lkb(iter);
@@ -4996,7 +4997,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
break;
}
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock(&ls->ls_waiters_lock);
return lkb;
}
@@ -5063,9 +5064,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
while (!atomic_dec_and_test(&lkb->lkb_wait_count))
unhold_lkb(lkb);
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
list_del_init(&lkb->lkb_wait_reply);
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock(&ls->ls_waiters_lock);
if (oc || ou) {
/* do an unlock or cancel instead of resending */
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 0455dddb0797..79dd516bde5f 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -515,7 +515,7 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_lkbidr_spin);
INIT_LIST_HEAD(&ls->ls_waiters);
- mutex_init(&ls->ls_waiters_mutex);
+ spin_lock_init(&ls->ls_waiters_lock);
INIT_LIST_HEAD(&ls->ls_orphans);
mutex_init(&ls->ls_orphans_mutex);
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 05/10] fs: dlm: convert res_lock to spinlock
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
` (2 preceding siblings ...)
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 04/10] fs: dlm: convert ls_waiters_mutex to spinlock Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 06/10] fs: dlm: make requestqueue handling non sleepable Alexander Aring
` (4 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch converts the per dlm rsb res_lock from a mutex to a spinlock.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dlm_internal.h | 2 +-
fs/dlm/lock.c | 2 +-
fs/dlm/lock.h | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 2f32e053a9dc..383b26144a31 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -290,7 +290,7 @@ struct dlm_lkb {
struct dlm_rsb {
struct dlm_ls *res_ls; /* the lockspace */
struct kref res_ref;
- struct mutex res_mutex;
+ spinlock_t res_lock;
unsigned long res_flags;
int res_length; /* length of rsb name */
int res_nodeid;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 5631623ce01a..3d17f4d6b765 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -415,7 +415,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
r->res_ls = ls;
r->res_length = len;
memcpy(r->res_name, name, len);
- mutex_init(&r->res_mutex);
+ spin_lock_init(&r->res_lock);
INIT_LIST_HEAD(&r->res_lookup);
INIT_LIST_HEAD(&r->res_grantqueue);
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index b54e2cbbe6e2..1428fa3482d7 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -68,12 +68,12 @@ static inline int is_master(struct dlm_rsb *r)
static inline void lock_rsb(struct dlm_rsb *r)
{
- mutex_lock(&r->res_mutex);
+ spin_lock(&r->res_lock);
}
static inline void unlock_rsb(struct dlm_rsb *r)
{
- mutex_unlock(&r->res_mutex);
+ spin_unlock(&r->res_lock);
}
#endif
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 06/10] fs: dlm: make requestqueue handling non sleepable
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
` (3 preceding siblings ...)
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 05/10] fs: dlm: convert res_lock " Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 07/10] fs: dlm: ls_root_lock semaphore to rwlock Alexander Aring
` (3 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch removes the ls_requestqueue_wait and convert the
ls_requestqueue_mutex to a rw lock. Instead of calling wait_event() in
dlm processing which waits until all messages are processed and allow
new message processing after recovering is done, this patch is using a
bitflag to signal when a message should be saved for future or not. When
recovery processes all saved messages we will clear this bit again and
allow new messages to processed directly.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dlm_internal.h | 5 ++---
fs/dlm/lock.c | 16 ++++++++++++++--
fs/dlm/lockspace.c | 4 +---
fs/dlm/member.c | 5 +++++
fs/dlm/requestqueue.c | 39 ++++++---------------------------------
5 files changed, 28 insertions(+), 41 deletions(-)
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 383b26144a31..65db6f834f04 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -624,9 +624,7 @@ struct dlm_ls {
struct rw_semaphore ls_in_recovery; /* block local requests */
struct rw_semaphore ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */
- atomic_t ls_requestqueue_cnt;
- wait_queue_head_t ls_requestqueue_wait;
- struct mutex ls_requestqueue_mutex;
+ rwlock_t ls_requestqueue_lock;
struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */
unsigned int ls_recover_dir_sent_res; /* for log info */
@@ -686,6 +684,7 @@ struct dlm_ls {
#define LSFL_UEVENT_WAIT 7
#define LSFL_CB_DELAY 9
#define LSFL_NODIR 10
+#define LSFL_RECV_MSG_BLOCKED 11
/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 3d17f4d6b765..989603643c03 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4746,20 +4746,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
int nodeid)
{
- if (dlm_locking_stopped(ls)) {
+try_again:
+ read_lock(&ls->ls_requestqueue_lock);
+ if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
/* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the
lockspace generation before we left. */
if (WARN_ON_ONCE(!ls->ls_generation)) {
+ read_unlock(&ls->ls_requestqueue_lock);
log_limit(ls, "receive %d from %d ignore old gen",
le32_to_cpu(ms->m_type), nodeid);
return;
}
+ read_unlock(&ls->ls_requestqueue_lock);
+ write_lock(&ls->ls_requestqueue_lock);
+ /* recheck because we hold writelock now */
+ if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
+ write_unlock_bh(&ls->ls_requestqueue_lock);
+ goto try_again;
+ }
+
dlm_add_requestqueue(ls, nodeid, ms);
+ write_unlock(&ls->ls_requestqueue_lock);
} else {
- dlm_wait_requestqueue(ls);
_receive_message(ls, ms, 0);
+ read_unlock(&ls->ls_requestqueue_lock);
}
}
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 79dd516bde5f..b2cb0621031f 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -554,9 +554,7 @@ static int new_lockspace(const char *name, const char *cluster,
init_rwsem(&ls->ls_in_recovery);
init_rwsem(&ls->ls_recv_active);
INIT_LIST_HEAD(&ls->ls_requestqueue);
- atomic_set(&ls->ls_requestqueue_cnt, 0);
- init_waitqueue_head(&ls->ls_requestqueue_wait);
- mutex_init(&ls->ls_requestqueue_mutex);
+ rwlock_init(&ls->ls_requestqueue_lock);
spin_lock_init(&ls->ls_clear_proc_locks);
/* Due backwards compatibility with 3.1 we need to use maximum
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index be7909ead71b..707cebcdc533 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -642,6 +642,11 @@ int dlm_ls_stop(struct dlm_ls *ls)
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
ls->ls_recover_seq++;
+
+ /* activate requestqueue and stop processing */
+ write_lock(&ls->ls_requestqueue_lock);
+ set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
+ write_unlock(&ls->ls_requestqueue_lock);
spin_unlock(&ls->ls_recover_lock);
/*
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index ef7b7c8d6907..8a57a2d70561 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
memcpy(&e->request, ms, sizeof(*ms));
memcpy(&e->request.m_extra, ms->m_extra, length);
- atomic_inc(&ls->ls_requestqueue_cnt);
- mutex_lock(&ls->ls_requestqueue_mutex);
list_add_tail(&e->list, &ls->ls_requestqueue);
- mutex_unlock(&ls->ls_requestqueue_mutex);
}
/*
@@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
int error = 0;
- mutex_lock(&ls->ls_requestqueue_mutex);
-
+ write_lock(&ls->ls_requestqueue_lock);
for (;;) {
if (list_empty(&ls->ls_requestqueue)) {
- mutex_unlock(&ls->ls_requestqueue_mutex);
+ clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
error = 0;
break;
}
- e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
- mutex_unlock(&ls->ls_requestqueue_mutex);
+ e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
ms = &e->request;
@@ -93,40 +88,20 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
e->recover_seq);
dlm_receive_message_saved(ls, &e->request, e->recover_seq);
-
- mutex_lock(&ls->ls_requestqueue_mutex);
list_del(&e->list);
- if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
- wake_up(&ls->ls_requestqueue_wait);
kfree(e);
if (dlm_locking_stopped(ls)) {
log_debug(ls, "process_requestqueue abort running");
- mutex_unlock(&ls->ls_requestqueue_mutex);
error = -EINTR;
break;
}
}
+ write_unlock(&ls->ls_requestqueue_lock);
return error;
}
-/*
- * After recovery is done, locking is resumed and dlm_recoverd takes all the
- * saved requests and processes them as they would have been by dlm_recv. At
- * the same time, dlm_recv will start receiving new requests from remote nodes.
- * We want to delay dlm_recv processing new requests until dlm_recoverd has
- * finished processing the old saved requests. We don't check for locking
- * stopped here because dlm_ls_stop won't stop locking until it's suspended us
- * (dlm_recv).
- */
-
-void dlm_wait_requestqueue(struct dlm_ls *ls)
-{
- wait_event(ls->ls_requestqueue_wait,
- atomic_read(&ls->ls_requestqueue_cnt) == 0);
-}
-
static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
{
__le32 type = ms->m_type;
@@ -157,17 +132,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
struct rq_entry *e, *safe;
- mutex_lock(&ls->ls_requestqueue_mutex);
+ write_lock(&ls->ls_requestqueue_lock);
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
ms = &e->request;
if (purge_request(ls, ms, e->nodeid)) {
list_del(&e->list);
- if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
- wake_up(&ls->ls_requestqueue_wait);
kfree(e);
}
}
- mutex_unlock(&ls->ls_requestqueue_mutex);
+ write_unlock(&ls->ls_requestqueue_lock);
}
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 07/10] fs: dlm: ls_root_lock semaphore to rwlock
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
` (4 preceding siblings ...)
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 06/10] fs: dlm: make requestqueue handling non sleepable Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 08/10] fs: dlm: ls_recv_active " Alexander Aring
` (2 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch converts the ls_root_lock semaphore to a rwlock to not
sleep during dlm message processing.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dir.c | 14 +++++++-------
fs/dlm/dlm_internal.h | 2 +-
fs/dlm/lock.c | 4 ++--
fs/dlm/lockspace.c | 2 +-
fs/dlm/recover.c | 28 ++++++++++++++--------------
5 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index f6acba4310a7..c70e286f3dbc 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -51,11 +51,11 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- down_read(&ls->ls_root_sem);
+ read_lock(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
}
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
@@ -216,16 +216,16 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
if (!rv)
return r;
- down_read(&ls->ls_root_sem);
+ read_lock(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name);
return r;
}
}
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
return NULL;
}
@@ -241,7 +241,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
int offset = 0, dir_nodeid;
__be16 be_namelen;
- down_read(&ls->ls_root_sem);
+ read_lock(&ls->ls_root_lock);
if (inlen > 1) {
r = find_rsb_root(ls, inbuf, inlen);
@@ -302,6 +302,6 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
ls->ls_recover_dir_sent_msg++;
}
out:
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
}
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 65db6f834f04..9106e20e6c20 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -642,7 +642,7 @@ struct dlm_ls {
spinlock_t ls_clear_proc_locks;
struct list_head ls_root_list; /* root resources */
- struct rw_semaphore ls_root_sem; /* protect root_list */
+ rwlock_t ls_root_lock; /* protect root_list */
const struct dlm_lockspace_ops *ls_ops;
void *ls_ops_arg;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 989603643c03..1031f233a3ad 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5216,7 +5216,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
if (!nodes_count)
return;
- down_write(&ls->ls_root_sem);
+ write_lock(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
hold_rsb(r);
lock_rsb(r);
@@ -5231,7 +5231,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
unlock_rsb(r);
unhold_rsb(r);
}
- up_write(&ls->ls_root_sem);
+ write_unlock(&ls->ls_root_lock);
if (lkb_count)
log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index b2cb0621031f..265d69752b90 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -579,7 +579,7 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_local_handle = ls;
init_waitqueue_head(&ls->ls_wait_general);
INIT_LIST_HEAD(&ls->ls_root_list);
- init_rwsem(&ls->ls_root_sem);
+ rwlock_init(&ls->ls_root_lock);
spin_lock(&lslist_lock);
ls->ls_create_count = 1;
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 752002304ca9..0d5b0f94eb46 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -529,10 +529,10 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
log_rinfo(ls, "dlm_recover_masters");
- down_read(&ls->ls_root_sem);
+ read_lock(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (dlm_recovery_stopped(ls)) {
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
error = -EINTR;
goto out;
}
@@ -546,11 +546,11 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
total++;
if (error) {
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
goto out;
}
}
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
@@ -660,7 +660,7 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
struct dlm_rsb *r;
int error, count = 0;
- down_read(&ls->ls_root_sem);
+ read_lock(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (is_master(r)) {
rsb_clear_flag(r, RSB_NEW_MASTER);
@@ -672,19 +672,19 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
goto out;
}
error = recover_locks(r, seq);
if (error) {
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
goto out;
}
count += r->res_recover_locks_count;
}
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
log_rinfo(ls, "dlm_recover_locks %d out", count);
@@ -858,7 +858,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
struct dlm_rsb *r;
unsigned int count = 0;
- down_read(&ls->ls_root_sem);
+ read_lock(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
lock_rsb(r);
if (is_master(r)) {
@@ -880,7 +880,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
rsb_clear_flag(r, RSB_NEW_MASTER2);
unlock_rsb(r);
}
- up_read(&ls->ls_root_sem);
+ read_unlock(&ls->ls_root_lock);
if (count)
log_rinfo(ls, "dlm_recover_rsbs %d done", count);
@@ -894,7 +894,7 @@ int dlm_create_root_list(struct dlm_ls *ls)
struct dlm_rsb *r;
int i, error = 0;
- down_write(&ls->ls_root_sem);
+ write_lock(&ls->ls_root_lock);
if (!list_empty(&ls->ls_root_list)) {
log_error(ls, "root list not empty");
error = -EINVAL;
@@ -914,7 +914,7 @@ int dlm_create_root_list(struct dlm_ls *ls)
spin_unlock(&ls->ls_rsbtbl[i].lock);
}
out:
- up_write(&ls->ls_root_sem);
+ write_unlock(&ls->ls_root_lock);
return error;
}
@@ -922,12 +922,12 @@ void dlm_release_root_list(struct dlm_ls *ls)
{
struct dlm_rsb *r, *safe;
- down_write(&ls->ls_root_sem);
+ write_lock(&ls->ls_root_lock);
list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
list_del_init(&r->res_root_list);
dlm_put_rsb(r);
}
- up_write(&ls->ls_root_sem);
+ write_unlock(&ls->ls_root_lock);
}
void dlm_clear_toss(struct dlm_ls *ls)
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 08/10] fs: dlm: ls_recv_active semaphore to rwlock
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
` (5 preceding siblings ...)
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 07/10] fs: dlm: ls_root_lock semaphore to rwlock Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 09/10] fs: dlm: convert message parsing locks to disable bh Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 10/10] fs: dlm: do dlm message processing in softirq context Alexander Aring
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch converts the ls_recv_active semaphore to a rwlock to not
sleep during dlm message processing.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dlm_internal.h | 2 +-
fs/dlm/lock.c | 4 ++--
fs/dlm/lockspace.c | 2 +-
fs/dlm/member.c | 4 ++--
fs/dlm/recoverd.c | 4 ++--
5 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 9106e20e6c20..6a1b2c806f72 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -622,7 +622,7 @@ struct dlm_ls {
uint64_t ls_recover_seq;
struct dlm_recover *ls_recover_args;
struct rw_semaphore ls_in_recovery; /* block local requests */
- struct rw_semaphore ls_recv_active; /* block dlm_recv */
+ rwlock_t ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */
rwlock_t ls_requestqueue_lock;
struct dlm_rcom *ls_recover_buf;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 1031f233a3ad..dccc0b888ca1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4831,7 +4831,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
be inactive (in this ls) before transitioning to recovery mode */
- down_read(&ls->ls_recv_active);
+ read_lock(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, &p->message, nodeid);
else if (hd->h_cmd == DLM_RCOM)
@@ -4839,7 +4839,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
else
log_error(ls, "invalid h_cmd %d from %d lockspace %x",
hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
- up_read(&ls->ls_recv_active);
+ read_unlock(&ls->ls_recv_active);
dlm_put_lockspace(ls);
}
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 265d69752b90..e35ea06200b5 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -552,7 +552,7 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_recover_seq = get_random_u64();
ls->ls_recover_args = NULL;
init_rwsem(&ls->ls_in_recovery);
- init_rwsem(&ls->ls_recv_active);
+ rwlock_init(&ls->ls_recv_active);
INIT_LIST_HEAD(&ls->ls_requestqueue);
rwlock_init(&ls->ls_requestqueue_lock);
spin_lock_init(&ls->ls_clear_proc_locks);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 707cebcdc533..ac1b555af9d6 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
* message to the requestqueue without races.
*/
- down_write(&ls->ls_recv_active);
+ write_lock(&ls->ls_recv_active);
/*
* Abort any recovery that's in progress (see RECOVER_STOP,
@@ -654,7 +654,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
* requestqueue for later.
*/
- up_write(&ls->ls_recv_active);
+ write_unlock(&ls->ls_recv_active);
/*
* This in_recovery lock does two things:
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 4d17491dea2f..c47bcc8be398 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -32,7 +32,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
int error = -EINTR;
- down_write(&ls->ls_recv_active);
+ write_lock(&ls->ls_recv_active);
spin_lock(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) {
@@ -44,7 +44,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
}
spin_unlock(&ls->ls_recover_lock);
- up_write(&ls->ls_recv_active);
+ write_unlock(&ls->ls_recv_active);
return error;
}
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 09/10] fs: dlm: convert message parsing locks to disable bh
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
` (6 preceding siblings ...)
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 08/10] fs: dlm: ls_recv_active " Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 10/10] fs: dlm: do dlm message processing in softirq context Alexander Aring
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch converts all spinlocks involved in message parsing to it's
_bh version. The reason to do that is to convert the message parsing
into softirq context and we need to prevent that those locks can be
interrupted by a softirq if those are held.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/ast.c | 28 +++----
fs/dlm/debug_fs.c | 32 ++++----
fs/dlm/dir.c | 14 ++--
fs/dlm/lock.c | 182 ++++++++++++++++++++++--------------------
fs/dlm/lock.h | 4 +-
fs/dlm/lockspace.c | 51 ++++++------
fs/dlm/lowcomms.c | 16 ++--
fs/dlm/member.c | 22 ++---
fs/dlm/midcomms.c | 40 +++++-----
fs/dlm/rcom.c | 26 +++---
fs/dlm/recover.c | 84 +++++++++----------
fs/dlm/recoverd.c | 12 +--
fs/dlm/requestqueue.c | 8 +-
fs/dlm/user.c | 34 ++++----
14 files changed, 281 insertions(+), 272 deletions(-)
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 1f2f70a1b824..e3c0903aaa6f 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -127,19 +127,19 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
return;
}
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref);
- spin_lock(&ls->ls_cb_lock);
+ spin_lock_bh(&ls->ls_cb_lock);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
} else {
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
}
- spin_unlock(&ls->ls_cb_lock);
+ spin_unlock_bh(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_FAILURE:
WARN_ON_ONCE(1);
@@ -150,7 +150,7 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
WARN_ON_ONCE(1);
break;
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
}
void dlm_callback_work(struct work_struct *work)
@@ -162,14 +162,14 @@ void dlm_callback_work(struct work_struct *work)
struct dlm_callback *cb;
int rv;
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
goto out;
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
for (;;) {
castfn = lkb->lkb_astfn;
@@ -190,14 +190,14 @@ void dlm_callback_work(struct work_struct *work)
kref_put(&cb->ref, dlm_release_callback);
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
break;
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
}
out:
@@ -225,9 +225,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
void dlm_callback_suspend(struct dlm_ls *ls)
{
if (ls->ls_callback_wq) {
- spin_lock(&ls->ls_cb_lock);
+ spin_lock_bh(&ls->ls_cb_lock);
set_bit(LSFL_CB_DELAY, &ls->ls_flags);
- spin_unlock(&ls->ls_cb_lock);
+ spin_unlock_bh(&ls->ls_cb_lock);
flush_workqueue(ls->ls_callback_wq);
}
@@ -245,7 +245,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
return;
more:
- spin_lock(&ls->ls_cb_lock);
+ spin_lock_bh(&ls->ls_cb_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
list_del_init(&lkb->lkb_cb_list);
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
@@ -256,7 +256,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
empty = list_empty(&ls->ls_cb_delay);
if (empty)
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
- spin_unlock(&ls->ls_cb_lock);
+ spin_unlock_bh(&ls->ls_cb_lock);
sum += count;
if (!empty) {
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 9d726971ba47..5726278dc62a 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -373,7 +373,7 @@ static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
/* lkb_id lkb_flags mode flags sb_status sb_flags */
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
list_for_each_entry(cb, &lkb->lkb_callbacks, list) {
seq_printf(s, "%x %x %d %x %d %x\n",
lkb->lkb_id,
@@ -383,7 +383,7 @@ static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
cb->sb_status,
cb->sb_flags);
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
}
static void print_format5(struct dlm_rsb *r, struct seq_file *s)
@@ -509,7 +509,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
for (node = rb_first(tree); node; node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
@@ -517,12 +517,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
return ri;
}
}
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
/*
* move to the first rsb in the next non-empty bucket
@@ -541,18 +541,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
node = rb_first(tree);
r = rb_entry(node, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
*pos = n;
return ri;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
}
}
@@ -573,7 +573,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
* move to the next rsb in the same bucket
*/
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
rp = ri->rsb;
next = rb_next(&rp->res_hashnode);
@@ -581,12 +581,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
dlm_put_rsb(rp);
++*pos;
return ri;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
dlm_put_rsb(rp);
/*
@@ -607,18 +607,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
next = rb_first(tree);
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
*pos = n;
return ri;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
}
}
@@ -824,7 +824,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
mutex_lock(&debug_buf_lock);
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
memset(debug_buf, 0, sizeof(debug_buf));
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
@@ -835,7 +835,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
break;
pos += ret;
}
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
mutex_unlock(&debug_buf_lock);
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index c70e286f3dbc..7974296ff305 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -51,11 +51,11 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
}
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
@@ -216,16 +216,16 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
if (!rv)
return r;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name);
return r;
}
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
return NULL;
}
@@ -241,7 +241,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
int offset = 0, dir_nodeid;
__be16 be_namelen;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
if (inlen > 1) {
r = find_rsb_root(ls, inbuf, inlen);
@@ -302,6 +302,6 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
ls->ls_recover_dir_sent_msg++;
}
out:
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
}
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index dccc0b888ca1..1d2d6a246441 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -342,10 +342,15 @@ static void put_rsb(struct dlm_rsb *r)
uint32_t bucket = r->res_bucket;
int rv;
+ /* TODO we need a kref_put_lock_bh(), however this
+ * is only an optimization.
+ */
+ local_bh_disable();
rv = kref_put_lock(&r->res_ref, toss_rsb,
&ls->ls_rsbtbl[bucket].lock);
if (rv)
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ local_bh_enable();
}
void dlm_put_rsb(struct dlm_rsb *r)
@@ -358,17 +363,17 @@ static int pre_rsb_struct(struct dlm_ls *ls)
struct dlm_rsb *r1, *r2;
int count = 0;
- spin_lock(&ls->ls_new_rsb_spin);
+ spin_lock_bh(&ls->ls_new_rsb_spin);
if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
return 0;
}
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
r1 = dlm_allocate_rsb(ls);
r2 = dlm_allocate_rsb(ls);
- spin_lock(&ls->ls_new_rsb_spin);
+ spin_lock_bh(&ls->ls_new_rsb_spin);
if (r1) {
list_add(&r1->res_hashchain, &ls->ls_new_rsb);
ls->ls_new_rsb_count++;
@@ -378,7 +383,7 @@ static int pre_rsb_struct(struct dlm_ls *ls)
ls->ls_new_rsb_count++;
}
count = ls->ls_new_rsb_count;
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
if (!count)
return -ENOMEM;
@@ -395,10 +400,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
struct dlm_rsb *r;
int count;
- spin_lock(&ls->ls_new_rsb_spin);
+ spin_lock_bh(&ls->ls_new_rsb_spin);
if (list_empty(&ls->ls_new_rsb)) {
count = ls->ls_new_rsb_count;
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
log_debug(ls, "find_rsb retry %d %d %s",
count, dlm_config.ci_new_rsb_count,
(const char *)name);
@@ -410,7 +415,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
/* Convert the empty list_head to a NULL rb_node for tree usage: */
memset(&r->res_hashnode, 0, sizeof(struct rb_node));
ls->ls_new_rsb_count--;
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
r->res_ls = ls;
r->res_length = len;
@@ -584,7 +589,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
goto out;
}
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error)
@@ -654,7 +659,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
@@ -703,7 +708,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
out_add:
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
out:
*r_ret = r;
return error;
@@ -728,7 +733,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
if (error < 0)
goto out;
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error)
@@ -786,7 +791,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
@@ -801,7 +806,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
out:
*r_ret = r;
return error;
@@ -1018,7 +1023,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
if (error < 0)
return error;
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error) {
/* because the rsb is active, we need to lock_rsb before
@@ -1026,7 +1031,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
*/
hold_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
lock_rsb(r);
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1052,14 +1057,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_toss_time = jiffies;
/* the rsb was inactive (on toss list) */
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return 0;
not_found:
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
@@ -1077,7 +1082,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
if (error) {
/* should never happen */
dlm_free_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
@@ -1085,7 +1090,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
*result = DLM_LU_ADD;
*r_nodeid = from_nodeid;
out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return error;
}
@@ -1096,13 +1101,13 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- spin_lock(&ls->ls_rsbtbl[i].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (r->res_hash == hash)
dlm_dump_rsb(r);
}
- spin_unlock(&ls->ls_rsbtbl[i].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
}
@@ -1115,7 +1120,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error)
goto out_dump;
@@ -1126,7 +1131,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
out_dump:
dlm_dump_rsb(r);
out:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
}
static void toss_rsb(struct kref *kref)
@@ -1208,11 +1213,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
spin_lock_init(&lkb->lkb_cb_lock);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
- spin_lock(&ls->ls_lkbidr_spin);
+ spin_lock_bh(&ls->ls_lkbidr_spin);
rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
if (rv >= 0)
lkb->lkb_id = rv;
- spin_unlock(&ls->ls_lkbidr_spin);
+ spin_unlock_bh(&ls->ls_lkbidr_spin);
if (rv < 0) {
log_error(ls, "create_lkb idr error %d", rv);
@@ -1233,11 +1238,11 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
- spin_lock(&ls->ls_lkbidr_spin);
+ spin_lock_bh(&ls->ls_lkbidr_spin);
lkb = idr_find(&ls->ls_lkbidr, lkid);
if (lkb)
kref_get(&lkb->lkb_ref);
- spin_unlock(&ls->ls_lkbidr_spin);
+ spin_unlock_bh(&ls->ls_lkbidr_spin);
*lkb_ret = lkb;
return lkb ? 0 : -ENOENT;
@@ -1261,6 +1266,10 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
uint32_t lkid = lkb->lkb_id;
int rv;
+ /* TODO we need a kref_put_lock_bh(), however this
+ * is only an optimization.
+ */
+ local_bh_disable();
rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
&ls->ls_lkbidr_spin);
if (rv) {
@@ -1274,6 +1283,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
dlm_free_lvb(lkb->lkb_lvbptr);
dlm_free_lkb(lkb);
}
+ local_bh_enable();
return rv;
}
@@ -1407,7 +1417,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
int error = 0;
int wc;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
if (is_overlap_unlock(lkb) ||
(is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
@@ -1447,7 +1457,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
log_error(ls, "addwait error %x %d flags %x %d %d %s",
lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
@@ -1546,9 +1556,9 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
error = _remove_from_waiters(lkb, mstype, NULL);
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
@@ -1563,10 +1573,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
int error;
if (!local)
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
if (!local)
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
@@ -1582,10 +1592,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
@@ -1642,7 +1652,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
else
clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
/*
* While searching for rsb's to free, we found some that require
@@ -1657,16 +1667,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
name = ls->ls_remove_names[i];
len = ls->ls_remove_lens[i];
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name not toss %s", name);
continue;
}
if (r->res_master_nodeid != our_nodeid) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name master %d dir %d our %d %s",
r->res_master_nodeid, r->res_dir_nodeid,
our_nodeid, name);
@@ -1675,7 +1685,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
if (r->res_dir_nodeid == our_nodeid) {
/* should never happen */
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name dir %d master %d our %d %s",
r->res_dir_nodeid, r->res_master_nodeid,
our_nodeid, name);
@@ -1684,21 +1694,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name toss_time %lu now %lu %s",
r->res_toss_time, jiffies, name);
continue;
}
if (!kref_put(&r->res_ref, kill_rsb)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name in use %s", name);
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
send_remove(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
dlm_free_rsb(r);
}
@@ -4163,7 +4173,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) {
@@ -4173,7 +4183,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
/* should not happen */
log_error(ls, "receive_remove from %d not found %s",
from_nodeid, name);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
if (r->res_master_nodeid != from_nodeid) {
@@ -4181,14 +4191,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
log_error(ls, "receive_remove keep from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
@@ -4196,19 +4206,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
log_error(ls, "receive_remove toss from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
if (kref_put(&r->res_ref, kill_rsb)) {
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
dlm_free_rsb(r);
} else {
log_error(ls, "receive_remove from %d rsb ref error",
from_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
}
}
@@ -4747,20 +4757,20 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
int nodeid)
{
try_again:
- read_lock(&ls->ls_requestqueue_lock);
+ read_lock_bh(&ls->ls_requestqueue_lock);
if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
/* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the
lockspace generation before we left. */
if (WARN_ON_ONCE(!ls->ls_generation)) {
- read_unlock(&ls->ls_requestqueue_lock);
+ read_unlock_bh(&ls->ls_requestqueue_lock);
log_limit(ls, "receive %d from %d ignore old gen",
le32_to_cpu(ms->m_type), nodeid);
return;
}
- read_unlock(&ls->ls_requestqueue_lock);
- write_lock(&ls->ls_requestqueue_lock);
+ read_unlock_bh(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
/* recheck because we hold writelock now */
if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
write_unlock_bh(&ls->ls_requestqueue_lock);
@@ -4768,10 +4778,10 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
}
dlm_add_requestqueue(ls, nodeid, ms);
- write_unlock(&ls->ls_requestqueue_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
} else {
_receive_message(ls, ms, 0);
- read_unlock(&ls->ls_requestqueue_lock);
+ read_unlock_bh(&ls->ls_requestqueue_lock);
}
}
@@ -4831,7 +4841,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
be inactive (in this ls) before transitioning to recovery mode */
- read_lock(&ls->ls_recv_active);
+ read_lock_bh(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, &p->message, nodeid);
else if (hd->h_cmd == DLM_RCOM)
@@ -4839,7 +4849,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
else
log_error(ls, "invalid h_cmd %d from %d lockspace %x",
hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
- read_unlock(&ls->ls_recv_active);
+ read_unlock_bh(&ls->ls_recv_active);
dlm_put_lockspace(ls);
}
@@ -4900,7 +4910,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (!ms_local)
return;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
@@ -4993,7 +5003,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
lkb->lkb_wait_type, wait_type);
}
}
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
kfree(ms_local);
}
@@ -5001,7 +5011,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
{
struct dlm_lkb *lkb = NULL, *iter;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
hold_lkb(iter);
@@ -5009,7 +5019,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
break;
}
}
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return lkb;
}
@@ -5076,9 +5086,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
while (!atomic_dec_and_test(&lkb->lkb_wait_count))
unhold_lkb(lkb);
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_del_init(&lkb->lkb_wait_reply);
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
if (oc || ou) {
/* do an unlock or cancel instead of resending */
@@ -5216,7 +5226,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
if (!nodes_count)
return;
- write_lock(&ls->ls_root_lock);
+ write_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
hold_rsb(r);
lock_rsb(r);
@@ -5231,7 +5241,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
unlock_rsb(r);
unhold_rsb(r);
}
- write_unlock(&ls->ls_root_lock);
+ write_unlock_bh(&ls->ls_root_lock);
if (lkb_count)
log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
@@ -5243,7 +5253,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
struct rb_node *n;
struct dlm_rsb *r;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -5254,10 +5264,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
continue;
}
hold_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
return r;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
return NULL;
}
@@ -5601,10 +5611,10 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
}
/* add this new lkb to the per-process list of locks */
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
hold_lkb(lkb);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
do_put = false;
out_put:
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
@@ -5734,9 +5744,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
* for the proc locks list.
*/
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
out:
kfree(ua_tmp);
return rv;
@@ -5780,11 +5790,11 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error)
goto out_put;
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
/* dlm_user_add_cb() may have already taken lkb off the proc list */
if (!list_empty(&lkb->lkb_ownqueue))
list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
out_put:
trace_dlm_unlock_end(ls, lkb, flags, error);
dlm_put_lkb(lkb);
@@ -5935,7 +5945,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
{
struct dlm_lkb *lkb = NULL;
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
if (list_empty(&proc->locks))
goto out;
@@ -5947,7 +5957,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
else
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
out:
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
return lkb;
}
@@ -5983,7 +5993,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
/* in-progress unlocks */
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
@@ -5998,7 +6008,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
dlm_unlock_recovery(ls);
}
@@ -6008,13 +6018,13 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
while (1) {
lkb = NULL;
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
if (!list_empty(&proc->locks)) {
lkb = list_entry(proc->locks.next, struct dlm_lkb,
lkb_ownqueue);
list_del_init(&lkb->lkb_ownqueue);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
if (!lkb)
break;
@@ -6024,21 +6034,21 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb); /* ref from proc->locks list */
}
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
list_del_init(&lkb->lkb_ownqueue);
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
dlm_put_lkb(lkb);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
dlm_purge_lkb_callbacks(lkb);
list_del_init(&lkb->lkb_cb_list);
dlm_put_lkb(lkb);
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
}
/* pid of 0 means purge all orphans */
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 1428fa3482d7..ac9fb73f5445 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -68,12 +68,12 @@ static inline int is_master(struct dlm_rsb *r)
static inline void lock_rsb(struct dlm_rsb *r)
{
- spin_lock(&r->res_lock);
+ spin_lock_bh(&r->res_lock);
}
static inline void unlock_rsb(struct dlm_rsb *r)
{
- spin_unlock(&r->res_lock);
+ spin_unlock_bh(&r->res_lock);
}
#endif
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index e35ea06200b5..a1d04175c4bf 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -251,15 +251,15 @@ static struct dlm_ls *find_ls_to_scan(void)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (time_after_eq(jiffies, ls->ls_scan_time +
dlm_config.ci_scan_secs * HZ)) {
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return NULL;
}
@@ -306,7 +306,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_global_id == id) {
@@ -316,7 +316,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
}
ls = NULL;
out:
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
@@ -324,7 +324,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_local_handle == lockspace) {
atomic_inc(&ls->ls_count);
@@ -333,7 +333,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
}
ls = NULL;
out:
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
@@ -341,7 +341,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_device.minor == minor) {
atomic_inc(&ls->ls_count);
@@ -350,7 +350,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
}
ls = NULL;
out:
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
@@ -365,15 +365,15 @@ static void remove_lockspace(struct dlm_ls *ls)
retry:
wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
if (atomic_read(&ls->ls_count) != 0) {
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
goto retry;
}
WARN_ON(ls->ls_create_count != 0);
list_del(&ls->ls_list);
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
}
static int threads_start(void)
@@ -448,7 +448,7 @@ static int new_lockspace(const char *name, const char *cluster,
error = 0;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
WARN_ON(ls->ls_create_count <= 0);
if (ls->ls_namelen != namelen)
@@ -464,7 +464,7 @@ static int new_lockspace(const char *name, const char *cluster,
error = 1;
break;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (error)
goto out;
@@ -581,10 +581,10 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_root_list);
rwlock_init(&ls->ls_root_lock);
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
ls->ls_create_count = 1;
list_add(&ls->ls_list, &lslist);
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (flags & DLM_LSFL_FS) {
error = dlm_callback_start(ls);
@@ -653,9 +653,9 @@ static int new_lockspace(const char *name, const char *cluster,
out_callback:
dlm_callback_stop(ls);
out_delist:
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_del(&ls->ls_list);
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
out_lkbidr:
@@ -754,7 +754,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
{
int rv;
- spin_lock(&ls->ls_lkbidr_spin);
+ spin_lock_bh(&ls->ls_lkbidr_spin);
if (force == 0) {
rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
} else if (force == 1) {
@@ -762,7 +762,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
} else {
rv = 0;
}
- spin_unlock(&ls->ls_lkbidr_spin);
+ spin_unlock_bh(&ls->ls_lkbidr_spin);
return rv;
}
@@ -774,7 +774,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
busy = lockspace_busy(ls, force);
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
if (ls->ls_create_count == 1) {
if (busy) {
rv = -EBUSY;
@@ -788,7 +788,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
} else {
rv = -EINVAL;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (rv) {
log_debug(ls, "release_lockspace no remove %d", rv);
@@ -916,20 +916,19 @@ void dlm_stop_lockspaces(void)
restart:
count = 0;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
count++;
continue;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
log_error(ls, "no userland control daemon, stopping lockspace");
dlm_ls_stop(ls);
goto restart;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (count)
log_print("dlm user daemon left %d lockspaces", count);
}
-
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index db71982d709d..28dd74aebc84 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -867,36 +867,36 @@ static void process_dlm_messages(struct work_struct *work)
{
struct processqueue_entry *pentry;
- spin_lock(&processqueue_lock);
+ spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (WARN_ON_ONCE(!pentry)) {
process_dlm_messages_pending = false;
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
return;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
free_processqueue_entry(pentry);
- spin_lock(&processqueue_lock);
+ spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (!pentry) {
process_dlm_messages_pending = false;
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
break;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
}
}
@@ -966,14 +966,14 @@ static int receive_from_sock(struct connection *con, int buflen)
memmove(con->rx_leftover_buf, pentry->buf + ret,
con->rx_leftover);
- spin_lock(&processqueue_lock);
+ spin_lock_bh(&processqueue_lock);
ret = atomic_inc_return(&processqueue_count);
list_add_tail(&pentry->list, &processqueue);
if (!process_dlm_messages_pending) {
process_dlm_messages_pending = true;
queue_work(process_workqueue, &process_work);
}
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
if (ret > DLM_MAX_PROCESS_BUFFERS)
return DLM_IO_FLUSH;
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index ac1b555af9d6..6401916a97ef 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
* message to the requestqueue without races.
*/
- write_lock(&ls->ls_recv_active);
+ write_lock_bh(&ls->ls_recv_active);
/*
* Abort any recovery that's in progress (see RECOVER_STOP,
@@ -638,23 +638,23 @@ int dlm_ls_stop(struct dlm_ls *ls)
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
ls->ls_recover_seq++;
/* activate requestqueue and stop processing */
- write_lock(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
- write_unlock(&ls->ls_requestqueue_lock);
- spin_unlock(&ls->ls_recover_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/
- write_unlock(&ls->ls_recv_active);
+ write_unlock_bh(&ls->ls_recv_active);
/*
* This in_recovery lock does two things:
@@ -679,13 +679,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
dlm_recoverd_suspend(ls);
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
kfree(ls->ls_slots);
ls->ls_slots = NULL;
ls->ls_num_slots = 0;
ls->ls_slots_size = 0;
ls->ls_recover_status = 0;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
dlm_recoverd_resume(ls);
@@ -719,12 +719,12 @@ int dlm_ls_start(struct dlm_ls *ls)
if (error < 0)
goto fail_rv;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
/* the lockspace needs to be stopped before it can be started */
if (!dlm_locking_stopped(ls)) {
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
log_error(ls, "start ignored: lockspace running");
error = -EINVAL;
goto fail;
@@ -735,7 +735,7 @@ int dlm_ls_start(struct dlm_ls *ls)
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (rv_old) {
log_error(ls, "unused recovery %llx %d",
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index ea0559e2a44d..7d75f86450d6 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -365,9 +365,9 @@ int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
node->users = 0;
midcomms_node_reset(node);
- spin_lock(&nodes_lock);
+ spin_lock_bh(&nodes_lock);
hlist_add_head_rcu(&node->hlist, &node_hash[r]);
- spin_unlock(&nodes_lock);
+ spin_unlock_bh(&nodes_lock);
node->debugfs = dlm_create_debug_comms_file(nodeid, node);
return 0;
@@ -478,7 +478,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
{
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive passive fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -492,13 +492,13 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
}
static void dlm_receive_buffer_3_2_trace(uint32_t seq,
@@ -535,7 +535,7 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -576,13 +576,13 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
/* probably remove_member caught it, do nothing */
break;
default:
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
break;
default:
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
@@ -1183,7 +1183,7 @@ void dlm_midcomms_exit(void)
static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
{
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive active fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -1203,13 +1203,13 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
}
void dlm_midcomms_add_member(int nodeid)
@@ -1224,7 +1224,7 @@ void dlm_midcomms_add_member(int nodeid)
return;
}
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
if (!node->users) {
pr_debug("receive add member from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -1252,7 +1252,7 @@ void dlm_midcomms_add_member(int nodeid)
node->users++;
pr_debug("node %d users inc count %d\n", nodeid, node->users);
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
@@ -1270,13 +1270,13 @@ void dlm_midcomms_remove_member(int nodeid)
return;
}
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
/* case of dlm_midcomms_addr() created node but
* was not added before because dlm_midcomms_close()
* removed the node
*/
if (!node->users) {
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
return;
}
@@ -1314,7 +1314,7 @@ void dlm_midcomms_remove_member(int nodeid)
break;
}
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
@@ -1352,7 +1352,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
return;
}
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive active shutdown for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
switch (node->state) {
@@ -1371,7 +1371,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
*/
break;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
if (DLM_DEBUG_FENCE_TERMINATION)
msleep(5000);
@@ -1442,9 +1442,9 @@ int dlm_midcomms_close(int nodeid)
ret = dlm_lowcomms_close(nodeid);
dlm_delete_debug_comms_file(node->debugfs);
- spin_lock(&nodes_lock);
+ spin_lock_bh(&nodes_lock);
hlist_del_rcu(&node->hlist);
- spin_unlock(&nodes_lock);
+ spin_unlock_bh(&nodes_lock);
srcu_read_unlock(&nodes_srcu, idx);
/* wait that all readers left until flush send queue */
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 2e3f529f3ff2..be1a71a6303a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -143,18 +143,18 @@ static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
{
- spin_lock(&ls->ls_rcom_spin);
+ spin_lock_bh(&ls->ls_rcom_spin);
*new_seq = cpu_to_le64(++ls->ls_rcom_seq);
set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
- spin_unlock(&ls->ls_rcom_spin);
+ spin_unlock_bh(&ls->ls_rcom_spin);
}
static void disallow_sync_reply(struct dlm_ls *ls)
{
- spin_lock(&ls->ls_rcom_spin);
+ spin_lock_bh(&ls->ls_rcom_spin);
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
- spin_unlock(&ls->ls_rcom_spin);
+ spin_unlock_bh(&ls->ls_rcom_spin);
}
/*
@@ -245,10 +245,10 @@ static void receive_rcom_status(struct dlm_ls *ls,
goto do_create;
}
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
num_slots = ls->ls_num_slots;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
len += num_slots * sizeof(struct rcom_slot);
do_create:
@@ -266,9 +266,9 @@ static void receive_rcom_status(struct dlm_ls *ls,
if (!num_slots)
goto do_send;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_num_slots != num_slots) {
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
log_debug(ls, "receive_rcom_status num_slots %d to %d",
num_slots, ls->ls_num_slots);
rc->rc_result = 0;
@@ -277,7 +277,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
}
dlm_slots_copy_out(ls, rc);
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
do_send:
send_rcom_stateless(msg, rc);
@@ -285,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
{
- spin_lock(&ls->ls_rcom_spin);
+ spin_lock_bh(&ls->ls_rcom_spin);
if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
@@ -301,7 +301,7 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
wake_up(&ls->ls_wait_general);
out:
- spin_unlock(&ls->ls_rcom_spin);
+ spin_unlock_bh(&ls->ls_rcom_spin);
}
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
@@ -613,11 +613,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
break;
}
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
stop = dlm_recovery_stopped(ls);
seq = ls->ls_recover_seq;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
goto ignore;
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 0d5b0f94eb46..64d94e6ea8fb 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -74,9 +74,9 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
uint32_t dlm_recover_status(struct dlm_ls *ls)
{
uint32_t status;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
return status;
}
@@ -87,9 +87,9 @@ static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
{
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, status);
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
}
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
@@ -188,13 +188,13 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
if (!rv) {
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, DLM_RS_NODES_ALL);
ls->ls_num_slots = num_slots;
ls->ls_slots_size = slots_size;
ls->ls_slots = slots;
ls->ls_generation = gen;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
} else {
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
}
@@ -241,9 +241,9 @@ static int recover_list_empty(struct dlm_ls *ls)
{
int empty;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
empty = list_empty(&ls->ls_recover_list);
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
return empty;
}
@@ -252,23 +252,23 @@ static void recover_list_add(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
if (list_empty(&r->res_recover_list)) {
list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
ls->ls_recover_list_count++;
dlm_hold_rsb(r);
}
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
}
static void recover_list_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
list_del_init(&r->res_recover_list);
ls->ls_recover_list_count--;
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
dlm_put_rsb(r);
}
@@ -277,7 +277,7 @@ static void recover_list_clear(struct dlm_ls *ls)
{
struct dlm_rsb *r, *s;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
list_del_init(&r->res_recover_list);
r->res_recover_locks_count = 0;
@@ -290,17 +290,17 @@ static void recover_list_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
}
static int recover_idr_empty(struct dlm_ls *ls)
{
int empty = 1;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
if (ls->ls_recover_list_count)
empty = 0;
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
return empty;
}
@@ -310,7 +310,7 @@ static int recover_idr_add(struct dlm_rsb *r)
struct dlm_ls *ls = r->res_ls;
int rv;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
if (r->res_id) {
rv = -1;
goto out_unlock;
@@ -324,7 +324,7 @@ static int recover_idr_add(struct dlm_rsb *r)
dlm_hold_rsb(r);
rv = 0;
out_unlock:
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
return rv;
}
@@ -332,11 +332,11 @@ static void recover_idr_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
idr_remove(&ls->ls_recover_idr, r->res_id);
r->res_id = 0;
ls->ls_recover_list_count--;
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
dlm_put_rsb(r);
}
@@ -345,9 +345,9 @@ static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
r = idr_find(&ls->ls_recover_idr, (int)id);
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
return r;
}
@@ -356,7 +356,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
struct dlm_rsb *r;
int id;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
idr_for_each_entry(&ls->ls_recover_idr, r, id) {
idr_remove(&ls->ls_recover_idr, id);
@@ -372,7 +372,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
}
@@ -529,10 +529,10 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
log_rinfo(ls, "dlm_recover_masters");
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (dlm_recovery_stopped(ls)) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
error = -EINTR;
goto out;
}
@@ -546,11 +546,11 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
total++;
if (error) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
goto out;
}
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
@@ -660,7 +660,7 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
struct dlm_rsb *r;
int error, count = 0;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (is_master(r)) {
rsb_clear_flag(r, RSB_NEW_MASTER);
@@ -672,19 +672,19 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
goto out;
}
error = recover_locks(r, seq);
if (error) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
goto out;
}
count += r->res_recover_locks_count;
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
log_rinfo(ls, "dlm_recover_locks %d out", count);
@@ -858,7 +858,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
struct dlm_rsb *r;
unsigned int count = 0;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
lock_rsb(r);
if (is_master(r)) {
@@ -880,7 +880,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
rsb_clear_flag(r, RSB_NEW_MASTER2);
unlock_rsb(r);
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
if (count)
log_rinfo(ls, "dlm_recover_rsbs %d done", count);
@@ -894,7 +894,7 @@ int dlm_create_root_list(struct dlm_ls *ls)
struct dlm_rsb *r;
int i, error = 0;
- write_lock(&ls->ls_root_lock);
+ write_lock_bh(&ls->ls_root_lock);
if (!list_empty(&ls->ls_root_list)) {
log_error(ls, "root list not empty");
error = -EINVAL;
@@ -902,7 +902,7 @@ int dlm_create_root_list(struct dlm_ls *ls)
}
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- spin_lock(&ls->ls_rsbtbl[i].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, &ls->ls_root_list);
@@ -911,10 +911,10 @@ int dlm_create_root_list(struct dlm_ls *ls)
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
log_error(ls, "dlm_create_root_list toss not empty");
- spin_unlock(&ls->ls_rsbtbl[i].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
out:
- write_unlock(&ls->ls_root_lock);
+ write_unlock_bh(&ls->ls_root_lock);
return error;
}
@@ -922,12 +922,12 @@ void dlm_release_root_list(struct dlm_ls *ls)
{
struct dlm_rsb *r, *safe;
- write_lock(&ls->ls_root_lock);
+ write_lock_bh(&ls->ls_root_lock);
list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
list_del_init(&r->res_root_list);
dlm_put_rsb(r);
}
- write_unlock(&ls->ls_root_lock);
+ write_unlock_bh(&ls->ls_root_lock);
}
void dlm_clear_toss(struct dlm_ls *ls)
@@ -938,7 +938,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- spin_lock(&ls->ls_rsbtbl[i].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -946,7 +946,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
dlm_free_rsb(r);
count++;
}
- spin_unlock(&ls->ls_rsbtbl[i].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
if (count)
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index c47bcc8be398..a774ebbf6ccb 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -32,9 +32,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
int error = -EINTR;
- write_lock(&ls->ls_recv_active);
+ write_lock_bh(&ls->ls_recv_active);
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags);
/* unblocks processes waiting to enter the dlm */
@@ -42,9 +42,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
error = 0;
}
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
- write_unlock(&ls->ls_recv_active);
+ write_unlock_bh(&ls->ls_recv_active);
return error;
}
@@ -259,12 +259,12 @@ static void do_ls_recovery(struct dlm_ls *ls)
struct dlm_recover *rv = NULL;
int error;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
rv = ls->ls_recover_args;
ls->ls_recover_args = NULL;
if (rv && ls->ls_recover_seq == rv->seq)
clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (rv) {
error = ls_recover(ls, rv);
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 8a57a2d70561..bc8f8a14ddf2 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -68,7 +68,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
int error = 0;
- write_lock(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
for (;;) {
if (list_empty(&ls->ls_requestqueue)) {
clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
@@ -97,7 +97,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
break;
}
}
- write_unlock(&ls->ls_requestqueue_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
return error;
}
@@ -132,7 +132,7 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
struct rq_entry *e, *safe;
- write_lock(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
ms = &e->request;
@@ -141,6 +141,6 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
kfree(e);
}
}
- write_unlock(&ls->ls_requestqueue_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
}
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 695e691b38b3..c0d35678ee54 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -206,7 +206,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
return;
ls = lkb->lkb_resource->res_ls;
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed
@@ -228,12 +228,12 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_FAILURE:
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
WARN_ON_ONCE(1);
goto out;
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
@@ -247,19 +247,19 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
WARN_ON_ONCE(1);
break;
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
/* N.B. spin_lock locks_spin, not asts_spin */
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
if (!list_empty(&lkb->lkb_ownqueue)) {
list_del_init(&lkb->lkb_ownqueue);
dlm_put_lkb(lkb);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
}
out:
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
}
static int device_user_lock(struct dlm_user_proc *proc,
@@ -832,10 +832,10 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
return -EINVAL;
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
if (list_empty(&proc->asts)) {
if (file->f_flags & O_NONBLOCK) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return -EAGAIN;
}
@@ -844,16 +844,16 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
repeat:
set_current_state(TASK_INTERRUPTIBLE);
if (list_empty(&proc->asts) && !signal_pending(current)) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
schedule();
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
goto repeat;
}
set_current_state(TASK_RUNNING);
remove_wait_queue(&proc->wait, &wait);
if (signal_pending(current)) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return -ERESTARTSYS;
}
}
@@ -875,7 +875,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
*/
log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
list_del_init(&lkb->lkb_cb_list);
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
/* removes ref for proc->asts, may cause lkb to be freed */
dlm_put_lkb(lkb);
WARN_ON_ONCE(1);
@@ -890,7 +890,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
WARN_ON_ONCE(1);
break;
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode);
@@ -925,12 +925,12 @@ static __poll_t device_poll(struct file *file, poll_table *wait)
poll_wait(file, &proc->wait, wait);
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
if (!list_empty(&proc->asts)) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return EPOLLIN | EPOLLRDNORM;
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return 0;
}
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Cluster-devel] [RFC dlm/next 10/10] fs: dlm: do dlm message processing in softirq context
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
` (7 preceding siblings ...)
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 09/10] fs: dlm: convert message parsing locks to disable bh Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch change the dlm message parsing context from a workqueue to
a softirq context. This will hopefully speed up our dlm message
processing by removing a bunch of implicit scheduling points such a
cond_reched() depends on the preemption model setting. A softirq
(except PREEMPT_RT) can only be interrupted by other softirqs or
higher prio context such as hardware interrupts.
This patch will only move the dlm message parsing to the right context,
there exists more ideas to improve message parsing like using lockless
locking when doing read access on datastructures or enable a parallel
per node message processing. Further patches will improve those
behaviors. For now this patch will reduce the amount of interruptions
when doing DLM message parsing.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 34 ++++++++++------------------------
1 file changed, 10 insertions(+), 24 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 28dd74aebc84..93f7e8827201 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -183,7 +183,6 @@ static int dlm_local_count;
/* Work queues */
static struct workqueue_struct *io_workqueue;
-static struct workqueue_struct *process_workqueue;
static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_SPINLOCK(connections_lock);
@@ -199,9 +198,9 @@ static const struct dlm_proto_ops *dlm_proto_ops;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
-static void process_dlm_messages(struct work_struct *work);
+static void process_dlm_messages(struct tasklet_struct *tasklet);
-static DECLARE_WORK(process_work, process_dlm_messages);
+static DECLARE_TASKLET(process_tasklet, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending;
static atomic_t processqueue_count;
@@ -863,7 +862,7 @@ struct dlm_processed_nodes {
struct list_head list;
};
-static void process_dlm_messages(struct work_struct *work)
+static void process_dlm_messages(struct tasklet_struct *tasklet)
{
struct processqueue_entry *pentry;
@@ -971,7 +970,7 @@ static int receive_from_sock(struct connection *con, int buflen)
list_add_tail(&pentry->list, &processqueue);
if (!process_dlm_messages_pending) {
process_dlm_messages_pending = true;
- queue_work(process_workqueue, &process_work);
+ tasklet_schedule(&process_tasklet);
}
spin_unlock_bh(&processqueue_lock);
@@ -1511,7 +1510,8 @@ static void process_recv_sockets(struct work_struct *work)
/* CF_RECV_PENDING cleared */
break;
case DLM_IO_FLUSH:
- flush_workqueue(process_workqueue);
+ tasklet_disable(&process_tasklet);
+ tasklet_enable(&process_tasklet);
fallthrough;
case DLM_IO_RESCHED:
cond_resched();
@@ -1685,11 +1685,6 @@ static void work_stop(void)
destroy_workqueue(io_workqueue);
io_workqueue = NULL;
}
-
- if (process_workqueue) {
- destroy_workqueue(process_workqueue);
- process_workqueue = NULL;
- }
}
static int work_start(void)
@@ -1701,18 +1696,6 @@ static int work_start(void)
return -ENOMEM;
}
- /* ordered dlm message process queue,
- * should be converted to a tasklet
- */
- process_workqueue = alloc_ordered_workqueue("dlm_process",
- WQ_HIGHPRI | WQ_MEM_RECLAIM);
- if (!process_workqueue) {
- log_print("can't start dlm_process");
- destroy_workqueue(io_workqueue);
- io_workqueue = NULL;
- return -ENOMEM;
- }
-
return 0;
}
@@ -1734,7 +1717,10 @@ void dlm_lowcomms_shutdown(void)
hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
shutdown_connection(con, true);
stop_connection_io(con);
- flush_workqueue(process_workqueue);
+
+ tasklet_disable(&process_tasklet);
+ tasklet_enable(&process_tasklet);
+
close_connection(con, true);
clean_one_writequeue(con);
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
end of thread, other threads:[~2023-09-08 20:46 UTC | newest]
Thread overview: 10+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 02/10] fs: dlm: switch to GFP_ATOMIC in dlm allocations Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 03/10] fs: dlm: remove explicit scheduling points Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 04/10] fs: dlm: convert ls_waiters_mutex to spinlock Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 05/10] fs: dlm: convert res_lock " Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 06/10] fs: dlm: make requestqueue handling non sleepable Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 07/10] fs: dlm: ls_root_lock semaphore to rwlock Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 08/10] fs: dlm: ls_recv_active " Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 09/10] fs: dlm: convert message parsing locks to disable bh Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 10/10] fs: dlm: do dlm message processing in softirq context Alexander Aring
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).