* [Cluster-devel] [RFC dlm/next 09/10] fs: dlm: convert message parsing locks to disable bh
2023-09-08 20:46 [Cluster-devel] [RFC dlm/next 01/10] fs: dlm: remove allocation parameter in msg allocation Alexander Aring
` (6 preceding siblings ...)
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 08/10] fs: dlm: ls_recv_active " Alexander Aring
@ 2023-09-08 20:46 ` Alexander Aring
2023-09-08 20:46 ` [Cluster-devel] [RFC dlm/next 10/10] fs: dlm: do dlm message processing in softirq context Alexander Aring
8 siblings, 0 replies; 10+ messages in thread
From: Alexander Aring @ 2023-09-08 20:46 UTC (permalink / raw)
To: teigland; +Cc: cluster-devel, gfs2
This patch converts all spinlocks involved in message parsing to it's
_bh version. The reason to do that is to convert the message parsing
into softirq context and we need to prevent that those locks can be
interrupted by a softirq if those are held.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/ast.c | 28 +++----
fs/dlm/debug_fs.c | 32 ++++----
fs/dlm/dir.c | 14 ++--
fs/dlm/lock.c | 182 ++++++++++++++++++++++--------------------
fs/dlm/lock.h | 4 +-
fs/dlm/lockspace.c | 51 ++++++------
fs/dlm/lowcomms.c | 16 ++--
fs/dlm/member.c | 22 ++---
fs/dlm/midcomms.c | 40 +++++-----
fs/dlm/rcom.c | 26 +++---
fs/dlm/recover.c | 84 +++++++++----------
fs/dlm/recoverd.c | 12 +--
fs/dlm/requestqueue.c | 8 +-
fs/dlm/user.c | 34 ++++----
14 files changed, 281 insertions(+), 272 deletions(-)
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 1f2f70a1b824..e3c0903aaa6f 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -127,19 +127,19 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
return;
}
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref);
- spin_lock(&ls->ls_cb_lock);
+ spin_lock_bh(&ls->ls_cb_lock);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
} else {
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
}
- spin_unlock(&ls->ls_cb_lock);
+ spin_unlock_bh(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_FAILURE:
WARN_ON_ONCE(1);
@@ -150,7 +150,7 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
WARN_ON_ONCE(1);
break;
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
}
void dlm_callback_work(struct work_struct *work)
@@ -162,14 +162,14 @@ void dlm_callback_work(struct work_struct *work)
struct dlm_callback *cb;
int rv;
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
goto out;
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
for (;;) {
castfn = lkb->lkb_astfn;
@@ -190,14 +190,14 @@ void dlm_callback_work(struct work_struct *work)
kref_put(&cb->ref, dlm_release_callback);
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
break;
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
}
out:
@@ -225,9 +225,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
void dlm_callback_suspend(struct dlm_ls *ls)
{
if (ls->ls_callback_wq) {
- spin_lock(&ls->ls_cb_lock);
+ spin_lock_bh(&ls->ls_cb_lock);
set_bit(LSFL_CB_DELAY, &ls->ls_flags);
- spin_unlock(&ls->ls_cb_lock);
+ spin_unlock_bh(&ls->ls_cb_lock);
flush_workqueue(ls->ls_callback_wq);
}
@@ -245,7 +245,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
return;
more:
- spin_lock(&ls->ls_cb_lock);
+ spin_lock_bh(&ls->ls_cb_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
list_del_init(&lkb->lkb_cb_list);
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
@@ -256,7 +256,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
empty = list_empty(&ls->ls_cb_delay);
if (empty)
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
- spin_unlock(&ls->ls_cb_lock);
+ spin_unlock_bh(&ls->ls_cb_lock);
sum += count;
if (!empty) {
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 9d726971ba47..5726278dc62a 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -373,7 +373,7 @@ static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
/* lkb_id lkb_flags mode flags sb_status sb_flags */
- spin_lock(&lkb->lkb_cb_lock);
+ spin_lock_bh(&lkb->lkb_cb_lock);
list_for_each_entry(cb, &lkb->lkb_callbacks, list) {
seq_printf(s, "%x %x %d %x %d %x\n",
lkb->lkb_id,
@@ -383,7 +383,7 @@ static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
cb->sb_status,
cb->sb_flags);
}
- spin_unlock(&lkb->lkb_cb_lock);
+ spin_unlock_bh(&lkb->lkb_cb_lock);
}
static void print_format5(struct dlm_rsb *r, struct seq_file *s)
@@ -509,7 +509,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
for (node = rb_first(tree); node; node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
@@ -517,12 +517,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
return ri;
}
}
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
/*
* move to the first rsb in the next non-empty bucket
@@ -541,18 +541,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
node = rb_first(tree);
r = rb_entry(node, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
*pos = n;
return ri;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
}
}
@@ -573,7 +573,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
* move to the next rsb in the same bucket
*/
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
rp = ri->rsb;
next = rb_next(&rp->res_hashnode);
@@ -581,12 +581,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
dlm_put_rsb(rp);
++*pos;
return ri;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
dlm_put_rsb(rp);
/*
@@ -607,18 +607,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
next = rb_first(tree);
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
*pos = n;
return ri;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
}
}
@@ -824,7 +824,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
mutex_lock(&debug_buf_lock);
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
memset(debug_buf, 0, sizeof(debug_buf));
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
@@ -835,7 +835,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
break;
pos += ret;
}
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
mutex_unlock(&debug_buf_lock);
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index c70e286f3dbc..7974296ff305 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -51,11 +51,11 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
}
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
@@ -216,16 +216,16 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
if (!rv)
return r;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name);
return r;
}
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
return NULL;
}
@@ -241,7 +241,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
int offset = 0, dir_nodeid;
__be16 be_namelen;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
if (inlen > 1) {
r = find_rsb_root(ls, inbuf, inlen);
@@ -302,6 +302,6 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
ls->ls_recover_dir_sent_msg++;
}
out:
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
}
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index dccc0b888ca1..1d2d6a246441 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -342,10 +342,15 @@ static void put_rsb(struct dlm_rsb *r)
uint32_t bucket = r->res_bucket;
int rv;
+ /* TODO we need a kref_put_lock_bh(), however this
+ * is only an optimization.
+ */
+ local_bh_disable();
rv = kref_put_lock(&r->res_ref, toss_rsb,
&ls->ls_rsbtbl[bucket].lock);
if (rv)
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ local_bh_enable();
}
void dlm_put_rsb(struct dlm_rsb *r)
@@ -358,17 +363,17 @@ static int pre_rsb_struct(struct dlm_ls *ls)
struct dlm_rsb *r1, *r2;
int count = 0;
- spin_lock(&ls->ls_new_rsb_spin);
+ spin_lock_bh(&ls->ls_new_rsb_spin);
if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
return 0;
}
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
r1 = dlm_allocate_rsb(ls);
r2 = dlm_allocate_rsb(ls);
- spin_lock(&ls->ls_new_rsb_spin);
+ spin_lock_bh(&ls->ls_new_rsb_spin);
if (r1) {
list_add(&r1->res_hashchain, &ls->ls_new_rsb);
ls->ls_new_rsb_count++;
@@ -378,7 +383,7 @@ static int pre_rsb_struct(struct dlm_ls *ls)
ls->ls_new_rsb_count++;
}
count = ls->ls_new_rsb_count;
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
if (!count)
return -ENOMEM;
@@ -395,10 +400,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
struct dlm_rsb *r;
int count;
- spin_lock(&ls->ls_new_rsb_spin);
+ spin_lock_bh(&ls->ls_new_rsb_spin);
if (list_empty(&ls->ls_new_rsb)) {
count = ls->ls_new_rsb_count;
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
log_debug(ls, "find_rsb retry %d %d %s",
count, dlm_config.ci_new_rsb_count,
(const char *)name);
@@ -410,7 +415,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
/* Convert the empty list_head to a NULL rb_node for tree usage: */
memset(&r->res_hashnode, 0, sizeof(struct rb_node));
ls->ls_new_rsb_count--;
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_new_rsb_spin);
r->res_ls = ls;
r->res_length = len;
@@ -584,7 +589,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
goto out;
}
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error)
@@ -654,7 +659,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
@@ -703,7 +708,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
out_add:
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
out:
*r_ret = r;
return error;
@@ -728,7 +733,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
if (error < 0)
goto out;
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error)
@@ -786,7 +791,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
@@ -801,7 +806,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
out:
*r_ret = r;
return error;
@@ -1018,7 +1023,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
if (error < 0)
return error;
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error) {
/* because the rsb is active, we need to lock_rsb before
@@ -1026,7 +1031,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
*/
hold_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
lock_rsb(r);
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
@@ -1052,14 +1057,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_toss_time = jiffies;
/* the rsb was inactive (on toss list) */
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return 0;
not_found:
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
@@ -1077,7 +1082,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
if (error) {
/* should never happen */
dlm_free_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
goto retry;
}
@@ -1085,7 +1090,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
*result = DLM_LU_ADD;
*r_nodeid = from_nodeid;
out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return error;
}
@@ -1096,13 +1101,13 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- spin_lock(&ls->ls_rsbtbl[i].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (r->res_hash == hash)
dlm_dump_rsb(r);
}
- spin_unlock(&ls->ls_rsbtbl[i].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
}
@@ -1115,7 +1120,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error)
goto out_dump;
@@ -1126,7 +1131,7 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
out_dump:
dlm_dump_rsb(r);
out:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
}
static void toss_rsb(struct kref *kref)
@@ -1208,11 +1213,11 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
spin_lock_init(&lkb->lkb_cb_lock);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
- spin_lock(&ls->ls_lkbidr_spin);
+ spin_lock_bh(&ls->ls_lkbidr_spin);
rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
if (rv >= 0)
lkb->lkb_id = rv;
- spin_unlock(&ls->ls_lkbidr_spin);
+ spin_unlock_bh(&ls->ls_lkbidr_spin);
if (rv < 0) {
log_error(ls, "create_lkb idr error %d", rv);
@@ -1233,11 +1238,11 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
- spin_lock(&ls->ls_lkbidr_spin);
+ spin_lock_bh(&ls->ls_lkbidr_spin);
lkb = idr_find(&ls->ls_lkbidr, lkid);
if (lkb)
kref_get(&lkb->lkb_ref);
- spin_unlock(&ls->ls_lkbidr_spin);
+ spin_unlock_bh(&ls->ls_lkbidr_spin);
*lkb_ret = lkb;
return lkb ? 0 : -ENOENT;
@@ -1261,6 +1266,10 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
uint32_t lkid = lkb->lkb_id;
int rv;
+ /* TODO we need a kref_put_lock_bh(), however this
+ * is only an optimization.
+ */
+ local_bh_disable();
rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
&ls->ls_lkbidr_spin);
if (rv) {
@@ -1274,6 +1283,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
dlm_free_lvb(lkb->lkb_lvbptr);
dlm_free_lkb(lkb);
}
+ local_bh_enable();
return rv;
}
@@ -1407,7 +1417,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
int error = 0;
int wc;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
if (is_overlap_unlock(lkb) ||
(is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
@@ -1447,7 +1457,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
log_error(ls, "addwait error %x %d flags %x %d %d %s",
lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
@@ -1546,9 +1556,9 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
error = _remove_from_waiters(lkb, mstype, NULL);
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
@@ -1563,10 +1573,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
int error;
if (!local)
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
if (!local)
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
@@ -1582,10 +1592,10 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
@@ -1642,7 +1652,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
else
clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
/*
* While searching for rsb's to free, we found some that require
@@ -1657,16 +1667,16 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
name = ls->ls_remove_names[i];
len = ls->ls_remove_lens[i];
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name not toss %s", name);
continue;
}
if (r->res_master_nodeid != our_nodeid) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name master %d dir %d our %d %s",
r->res_master_nodeid, r->res_dir_nodeid,
our_nodeid, name);
@@ -1675,7 +1685,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
if (r->res_dir_nodeid == our_nodeid) {
/* should never happen */
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name dir %d master %d our %d %s",
r->res_dir_nodeid, r->res_master_nodeid,
our_nodeid, name);
@@ -1684,21 +1694,21 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name toss_time %lu now %lu %s",
r->res_toss_time, jiffies, name);
continue;
}
if (!kref_put(&r->res_ref, kill_rsb)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name in use %s", name);
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
send_remove(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
dlm_free_rsb(r);
}
@@ -4163,7 +4173,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) {
@@ -4173,7 +4183,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
/* should not happen */
log_error(ls, "receive_remove from %d not found %s",
from_nodeid, name);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
if (r->res_master_nodeid != from_nodeid) {
@@ -4181,14 +4191,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
log_error(ls, "receive_remove keep from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
@@ -4196,19 +4206,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
log_error(ls, "receive_remove toss from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
return;
}
if (kref_put(&r->res_ref, kill_rsb)) {
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
dlm_free_rsb(r);
} else {
log_error(ls, "receive_remove from %d rsb ref error",
from_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[b].lock);
}
}
@@ -4747,20 +4757,20 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
int nodeid)
{
try_again:
- read_lock(&ls->ls_requestqueue_lock);
+ read_lock_bh(&ls->ls_requestqueue_lock);
if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
/* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the
lockspace generation before we left. */
if (WARN_ON_ONCE(!ls->ls_generation)) {
- read_unlock(&ls->ls_requestqueue_lock);
+ read_unlock_bh(&ls->ls_requestqueue_lock);
log_limit(ls, "receive %d from %d ignore old gen",
le32_to_cpu(ms->m_type), nodeid);
return;
}
- read_unlock(&ls->ls_requestqueue_lock);
- write_lock(&ls->ls_requestqueue_lock);
+ read_unlock_bh(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
/* recheck because we hold writelock now */
if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
write_unlock_bh(&ls->ls_requestqueue_lock);
@@ -4768,10 +4778,10 @@ static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
}
dlm_add_requestqueue(ls, nodeid, ms);
- write_unlock(&ls->ls_requestqueue_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
} else {
_receive_message(ls, ms, 0);
- read_unlock(&ls->ls_requestqueue_lock);
+ read_unlock_bh(&ls->ls_requestqueue_lock);
}
}
@@ -4831,7 +4841,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
be inactive (in this ls) before transitioning to recovery mode */
- read_lock(&ls->ls_recv_active);
+ read_lock_bh(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, &p->message, nodeid);
else if (hd->h_cmd == DLM_RCOM)
@@ -4839,7 +4849,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
else
log_error(ls, "invalid h_cmd %d from %d lockspace %x",
hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
- read_unlock(&ls->ls_recv_active);
+ read_unlock_bh(&ls->ls_recv_active);
dlm_put_lockspace(ls);
}
@@ -4900,7 +4910,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (!ms_local)
return;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
@@ -4993,7 +5003,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
lkb->lkb_wait_type, wait_type);
}
}
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
kfree(ms_local);
}
@@ -5001,7 +5011,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
{
struct dlm_lkb *lkb = NULL, *iter;
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
hold_lkb(iter);
@@ -5009,7 +5019,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
break;
}
}
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return lkb;
}
@@ -5076,9 +5086,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
while (!atomic_dec_and_test(&lkb->lkb_wait_count))
unhold_lkb(lkb);
- spin_lock(&ls->ls_waiters_lock);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_del_init(&lkb->lkb_wait_reply);
- spin_unlock(&ls->ls_waiters_lock);
+ spin_unlock_bh(&ls->ls_waiters_lock);
if (oc || ou) {
/* do an unlock or cancel instead of resending */
@@ -5216,7 +5226,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
if (!nodes_count)
return;
- write_lock(&ls->ls_root_lock);
+ write_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
hold_rsb(r);
lock_rsb(r);
@@ -5231,7 +5241,7 @@ void dlm_recover_purge(struct dlm_ls *ls)
unlock_rsb(r);
unhold_rsb(r);
}
- write_unlock(&ls->ls_root_lock);
+ write_unlock_bh(&ls->ls_root_lock);
if (lkb_count)
log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
@@ -5243,7 +5253,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
struct rb_node *n;
struct dlm_rsb *r;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -5254,10 +5264,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
continue;
}
hold_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
return r;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
return NULL;
}
@@ -5601,10 +5611,10 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
}
/* add this new lkb to the per-process list of locks */
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
hold_lkb(lkb);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
do_put = false;
out_put:
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
@@ -5734,9 +5744,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
* for the proc locks list.
*/
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
out:
kfree(ua_tmp);
return rv;
@@ -5780,11 +5790,11 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error)
goto out_put;
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
/* dlm_user_add_cb() may have already taken lkb off the proc list */
if (!list_empty(&lkb->lkb_ownqueue))
list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
out_put:
trace_dlm_unlock_end(ls, lkb, flags, error);
dlm_put_lkb(lkb);
@@ -5935,7 +5945,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
{
struct dlm_lkb *lkb = NULL;
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
if (list_empty(&proc->locks))
goto out;
@@ -5947,7 +5957,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
else
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
out:
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
return lkb;
}
@@ -5983,7 +5993,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
/* in-progress unlocks */
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
@@ -5998,7 +6008,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
dlm_unlock_recovery(ls);
}
@@ -6008,13 +6018,13 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
while (1) {
lkb = NULL;
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
if (!list_empty(&proc->locks)) {
lkb = list_entry(proc->locks.next, struct dlm_lkb,
lkb_ownqueue);
list_del_init(&lkb->lkb_ownqueue);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
if (!lkb)
break;
@@ -6024,21 +6034,21 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb); /* ref from proc->locks list */
}
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
list_del_init(&lkb->lkb_ownqueue);
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
dlm_put_lkb(lkb);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
dlm_purge_lkb_callbacks(lkb);
list_del_init(&lkb->lkb_cb_list);
dlm_put_lkb(lkb);
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
}
/* pid of 0 means purge all orphans */
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 1428fa3482d7..ac9fb73f5445 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -68,12 +68,12 @@ static inline int is_master(struct dlm_rsb *r)
static inline void lock_rsb(struct dlm_rsb *r)
{
- spin_lock(&r->res_lock);
+ spin_lock_bh(&r->res_lock);
}
static inline void unlock_rsb(struct dlm_rsb *r)
{
- spin_unlock(&r->res_lock);
+ spin_unlock_bh(&r->res_lock);
}
#endif
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index e35ea06200b5..a1d04175c4bf 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -251,15 +251,15 @@ static struct dlm_ls *find_ls_to_scan(void)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (time_after_eq(jiffies, ls->ls_scan_time +
dlm_config.ci_scan_secs * HZ)) {
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return NULL;
}
@@ -306,7 +306,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_global_id == id) {
@@ -316,7 +316,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
}
ls = NULL;
out:
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
@@ -324,7 +324,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_local_handle == lockspace) {
atomic_inc(&ls->ls_count);
@@ -333,7 +333,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
}
ls = NULL;
out:
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
@@ -341,7 +341,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
{
struct dlm_ls *ls;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_device.minor == minor) {
atomic_inc(&ls->ls_count);
@@ -350,7 +350,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
}
ls = NULL;
out:
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
return ls;
}
@@ -365,15 +365,15 @@ static void remove_lockspace(struct dlm_ls *ls)
retry:
wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
if (atomic_read(&ls->ls_count) != 0) {
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
goto retry;
}
WARN_ON(ls->ls_create_count != 0);
list_del(&ls->ls_list);
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
}
static int threads_start(void)
@@ -448,7 +448,7 @@ static int new_lockspace(const char *name, const char *cluster,
error = 0;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
WARN_ON(ls->ls_create_count <= 0);
if (ls->ls_namelen != namelen)
@@ -464,7 +464,7 @@ static int new_lockspace(const char *name, const char *cluster,
error = 1;
break;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (error)
goto out;
@@ -581,10 +581,10 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_root_list);
rwlock_init(&ls->ls_root_lock);
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
ls->ls_create_count = 1;
list_add(&ls->ls_list, &lslist);
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (flags & DLM_LSFL_FS) {
error = dlm_callback_start(ls);
@@ -653,9 +653,9 @@ static int new_lockspace(const char *name, const char *cluster,
out_callback:
dlm_callback_stop(ls);
out_delist:
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_del(&ls->ls_list);
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
out_lkbidr:
@@ -754,7 +754,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
{
int rv;
- spin_lock(&ls->ls_lkbidr_spin);
+ spin_lock_bh(&ls->ls_lkbidr_spin);
if (force == 0) {
rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
} else if (force == 1) {
@@ -762,7 +762,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
} else {
rv = 0;
}
- spin_unlock(&ls->ls_lkbidr_spin);
+ spin_unlock_bh(&ls->ls_lkbidr_spin);
return rv;
}
@@ -774,7 +774,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
busy = lockspace_busy(ls, force);
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
if (ls->ls_create_count == 1) {
if (busy) {
rv = -EBUSY;
@@ -788,7 +788,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
} else {
rv = -EINVAL;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (rv) {
log_debug(ls, "release_lockspace no remove %d", rv);
@@ -916,20 +916,19 @@ void dlm_stop_lockspaces(void)
restart:
count = 0;
- spin_lock(&lslist_lock);
+ spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
count++;
continue;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
log_error(ls, "no userland control daemon, stopping lockspace");
dlm_ls_stop(ls);
goto restart;
}
- spin_unlock(&lslist_lock);
+ spin_unlock_bh(&lslist_lock);
if (count)
log_print("dlm user daemon left %d lockspaces", count);
}
-
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index db71982d709d..28dd74aebc84 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -867,36 +867,36 @@ static void process_dlm_messages(struct work_struct *work)
{
struct processqueue_entry *pentry;
- spin_lock(&processqueue_lock);
+ spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (WARN_ON_ONCE(!pentry)) {
process_dlm_messages_pending = false;
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
return;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
free_processqueue_entry(pentry);
- spin_lock(&processqueue_lock);
+ spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (!pentry) {
process_dlm_messages_pending = false;
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
break;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
}
}
@@ -966,14 +966,14 @@ static int receive_from_sock(struct connection *con, int buflen)
memmove(con->rx_leftover_buf, pentry->buf + ret,
con->rx_leftover);
- spin_lock(&processqueue_lock);
+ spin_lock_bh(&processqueue_lock);
ret = atomic_inc_return(&processqueue_count);
list_add_tail(&pentry->list, &processqueue);
if (!process_dlm_messages_pending) {
process_dlm_messages_pending = true;
queue_work(process_workqueue, &process_work);
}
- spin_unlock(&processqueue_lock);
+ spin_unlock_bh(&processqueue_lock);
if (ret > DLM_MAX_PROCESS_BUFFERS)
return DLM_IO_FLUSH;
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index ac1b555af9d6..6401916a97ef 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
* message to the requestqueue without races.
*/
- write_lock(&ls->ls_recv_active);
+ write_lock_bh(&ls->ls_recv_active);
/*
* Abort any recovery that's in progress (see RECOVER_STOP,
@@ -638,23 +638,23 @@ int dlm_ls_stop(struct dlm_ls *ls)
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
ls->ls_recover_seq++;
/* activate requestqueue and stop processing */
- write_lock(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
- write_unlock(&ls->ls_requestqueue_lock);
- spin_unlock(&ls->ls_recover_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/
- write_unlock(&ls->ls_recv_active);
+ write_unlock_bh(&ls->ls_recv_active);
/*
* This in_recovery lock does two things:
@@ -679,13 +679,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
dlm_recoverd_suspend(ls);
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
kfree(ls->ls_slots);
ls->ls_slots = NULL;
ls->ls_num_slots = 0;
ls->ls_slots_size = 0;
ls->ls_recover_status = 0;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
dlm_recoverd_resume(ls);
@@ -719,12 +719,12 @@ int dlm_ls_start(struct dlm_ls *ls)
if (error < 0)
goto fail_rv;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
/* the lockspace needs to be stopped before it can be started */
if (!dlm_locking_stopped(ls)) {
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
log_error(ls, "start ignored: lockspace running");
error = -EINVAL;
goto fail;
@@ -735,7 +735,7 @@ int dlm_ls_start(struct dlm_ls *ls)
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (rv_old) {
log_error(ls, "unused recovery %llx %d",
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index ea0559e2a44d..7d75f86450d6 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -365,9 +365,9 @@ int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
node->users = 0;
midcomms_node_reset(node);
- spin_lock(&nodes_lock);
+ spin_lock_bh(&nodes_lock);
hlist_add_head_rcu(&node->hlist, &node_hash[r]);
- spin_unlock(&nodes_lock);
+ spin_unlock_bh(&nodes_lock);
node->debugfs = dlm_create_debug_comms_file(nodeid, node);
return 0;
@@ -478,7 +478,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
{
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive passive fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -492,13 +492,13 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
}
static void dlm_receive_buffer_3_2_trace(uint32_t seq,
@@ -535,7 +535,7 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -576,13 +576,13 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
/* probably remove_member caught it, do nothing */
break;
default:
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
break;
default:
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
@@ -1183,7 +1183,7 @@ void dlm_midcomms_exit(void)
static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
{
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive active fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -1203,13 +1203,13 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
}
void dlm_midcomms_add_member(int nodeid)
@@ -1224,7 +1224,7 @@ void dlm_midcomms_add_member(int nodeid)
return;
}
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
if (!node->users) {
pr_debug("receive add member from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -1252,7 +1252,7 @@ void dlm_midcomms_add_member(int nodeid)
node->users++;
pr_debug("node %d users inc count %d\n", nodeid, node->users);
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
@@ -1270,13 +1270,13 @@ void dlm_midcomms_remove_member(int nodeid)
return;
}
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
/* case of dlm_midcomms_addr() created node but
* was not added before because dlm_midcomms_close()
* removed the node
*/
if (!node->users) {
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
return;
}
@@ -1314,7 +1314,7 @@ void dlm_midcomms_remove_member(int nodeid)
break;
}
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
@@ -1352,7 +1352,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
return;
}
- spin_lock(&node->state_lock);
+ spin_lock_bh(&node->state_lock);
pr_debug("receive active shutdown for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
switch (node->state) {
@@ -1371,7 +1371,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
*/
break;
}
- spin_unlock(&node->state_lock);
+ spin_unlock_bh(&node->state_lock);
if (DLM_DEBUG_FENCE_TERMINATION)
msleep(5000);
@@ -1442,9 +1442,9 @@ int dlm_midcomms_close(int nodeid)
ret = dlm_lowcomms_close(nodeid);
dlm_delete_debug_comms_file(node->debugfs);
- spin_lock(&nodes_lock);
+ spin_lock_bh(&nodes_lock);
hlist_del_rcu(&node->hlist);
- spin_unlock(&nodes_lock);
+ spin_unlock_bh(&nodes_lock);
srcu_read_unlock(&nodes_srcu, idx);
/* wait that all readers left until flush send queue */
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 2e3f529f3ff2..be1a71a6303a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -143,18 +143,18 @@ static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
{
- spin_lock(&ls->ls_rcom_spin);
+ spin_lock_bh(&ls->ls_rcom_spin);
*new_seq = cpu_to_le64(++ls->ls_rcom_seq);
set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
- spin_unlock(&ls->ls_rcom_spin);
+ spin_unlock_bh(&ls->ls_rcom_spin);
}
static void disallow_sync_reply(struct dlm_ls *ls)
{
- spin_lock(&ls->ls_rcom_spin);
+ spin_lock_bh(&ls->ls_rcom_spin);
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
- spin_unlock(&ls->ls_rcom_spin);
+ spin_unlock_bh(&ls->ls_rcom_spin);
}
/*
@@ -245,10 +245,10 @@ static void receive_rcom_status(struct dlm_ls *ls,
goto do_create;
}
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
num_slots = ls->ls_num_slots;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
len += num_slots * sizeof(struct rcom_slot);
do_create:
@@ -266,9 +266,9 @@ static void receive_rcom_status(struct dlm_ls *ls,
if (!num_slots)
goto do_send;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_num_slots != num_slots) {
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
log_debug(ls, "receive_rcom_status num_slots %d to %d",
num_slots, ls->ls_num_slots);
rc->rc_result = 0;
@@ -277,7 +277,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
}
dlm_slots_copy_out(ls, rc);
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
do_send:
send_rcom_stateless(msg, rc);
@@ -285,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
{
- spin_lock(&ls->ls_rcom_spin);
+ spin_lock_bh(&ls->ls_rcom_spin);
if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
@@ -301,7 +301,7 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
wake_up(&ls->ls_wait_general);
out:
- spin_unlock(&ls->ls_rcom_spin);
+ spin_unlock_bh(&ls->ls_rcom_spin);
}
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
@@ -613,11 +613,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
break;
}
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
stop = dlm_recovery_stopped(ls);
seq = ls->ls_recover_seq;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
goto ignore;
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 0d5b0f94eb46..64d94e6ea8fb 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -74,9 +74,9 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
uint32_t dlm_recover_status(struct dlm_ls *ls)
{
uint32_t status;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
return status;
}
@@ -87,9 +87,9 @@ static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
{
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, status);
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
}
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
@@ -188,13 +188,13 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
if (!rv) {
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, DLM_RS_NODES_ALL);
ls->ls_num_slots = num_slots;
ls->ls_slots_size = slots_size;
ls->ls_slots = slots;
ls->ls_generation = gen;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
} else {
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
}
@@ -241,9 +241,9 @@ static int recover_list_empty(struct dlm_ls *ls)
{
int empty;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
empty = list_empty(&ls->ls_recover_list);
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
return empty;
}
@@ -252,23 +252,23 @@ static void recover_list_add(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
if (list_empty(&r->res_recover_list)) {
list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
ls->ls_recover_list_count++;
dlm_hold_rsb(r);
}
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
}
static void recover_list_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
list_del_init(&r->res_recover_list);
ls->ls_recover_list_count--;
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
dlm_put_rsb(r);
}
@@ -277,7 +277,7 @@ static void recover_list_clear(struct dlm_ls *ls)
{
struct dlm_rsb *r, *s;
- spin_lock(&ls->ls_recover_list_lock);
+ spin_lock_bh(&ls->ls_recover_list_lock);
list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
list_del_init(&r->res_recover_list);
r->res_recover_locks_count = 0;
@@ -290,17 +290,17 @@ static void recover_list_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
- spin_unlock(&ls->ls_recover_list_lock);
+ spin_unlock_bh(&ls->ls_recover_list_lock);
}
static int recover_idr_empty(struct dlm_ls *ls)
{
int empty = 1;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
if (ls->ls_recover_list_count)
empty = 0;
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
return empty;
}
@@ -310,7 +310,7 @@ static int recover_idr_add(struct dlm_rsb *r)
struct dlm_ls *ls = r->res_ls;
int rv;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
if (r->res_id) {
rv = -1;
goto out_unlock;
@@ -324,7 +324,7 @@ static int recover_idr_add(struct dlm_rsb *r)
dlm_hold_rsb(r);
rv = 0;
out_unlock:
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
return rv;
}
@@ -332,11 +332,11 @@ static void recover_idr_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
idr_remove(&ls->ls_recover_idr, r->res_id);
r->res_id = 0;
ls->ls_recover_list_count--;
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
dlm_put_rsb(r);
}
@@ -345,9 +345,9 @@ static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
r = idr_find(&ls->ls_recover_idr, (int)id);
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
return r;
}
@@ -356,7 +356,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
struct dlm_rsb *r;
int id;
- spin_lock(&ls->ls_recover_idr_lock);
+ spin_lock_bh(&ls->ls_recover_idr_lock);
idr_for_each_entry(&ls->ls_recover_idr, r, id) {
idr_remove(&ls->ls_recover_idr, id);
@@ -372,7 +372,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
- spin_unlock(&ls->ls_recover_idr_lock);
+ spin_unlock_bh(&ls->ls_recover_idr_lock);
}
@@ -529,10 +529,10 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
log_rinfo(ls, "dlm_recover_masters");
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (dlm_recovery_stopped(ls)) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
error = -EINTR;
goto out;
}
@@ -546,11 +546,11 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
total++;
if (error) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
goto out;
}
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
@@ -660,7 +660,7 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
struct dlm_rsb *r;
int error, count = 0;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (is_master(r)) {
rsb_clear_flag(r, RSB_NEW_MASTER);
@@ -672,19 +672,19 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
goto out;
}
error = recover_locks(r, seq);
if (error) {
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
goto out;
}
count += r->res_recover_locks_count;
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
log_rinfo(ls, "dlm_recover_locks %d out", count);
@@ -858,7 +858,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
struct dlm_rsb *r;
unsigned int count = 0;
- read_lock(&ls->ls_root_lock);
+ read_lock_bh(&ls->ls_root_lock);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
lock_rsb(r);
if (is_master(r)) {
@@ -880,7 +880,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
rsb_clear_flag(r, RSB_NEW_MASTER2);
unlock_rsb(r);
}
- read_unlock(&ls->ls_root_lock);
+ read_unlock_bh(&ls->ls_root_lock);
if (count)
log_rinfo(ls, "dlm_recover_rsbs %d done", count);
@@ -894,7 +894,7 @@ int dlm_create_root_list(struct dlm_ls *ls)
struct dlm_rsb *r;
int i, error = 0;
- write_lock(&ls->ls_root_lock);
+ write_lock_bh(&ls->ls_root_lock);
if (!list_empty(&ls->ls_root_list)) {
log_error(ls, "root list not empty");
error = -EINVAL;
@@ -902,7 +902,7 @@ int dlm_create_root_list(struct dlm_ls *ls)
}
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- spin_lock(&ls->ls_rsbtbl[i].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, &ls->ls_root_list);
@@ -911,10 +911,10 @@ int dlm_create_root_list(struct dlm_ls *ls)
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
log_error(ls, "dlm_create_root_list toss not empty");
- spin_unlock(&ls->ls_rsbtbl[i].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
out:
- write_unlock(&ls->ls_root_lock);
+ write_unlock_bh(&ls->ls_root_lock);
return error;
}
@@ -922,12 +922,12 @@ void dlm_release_root_list(struct dlm_ls *ls)
{
struct dlm_rsb *r, *safe;
- write_lock(&ls->ls_root_lock);
+ write_lock_bh(&ls->ls_root_lock);
list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
list_del_init(&r->res_root_list);
dlm_put_rsb(r);
}
- write_unlock(&ls->ls_root_lock);
+ write_unlock_bh(&ls->ls_root_lock);
}
void dlm_clear_toss(struct dlm_ls *ls)
@@ -938,7 +938,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- spin_lock(&ls->ls_rsbtbl[i].lock);
+ spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
@@ -946,7 +946,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
dlm_free_rsb(r);
count++;
}
- spin_unlock(&ls->ls_rsbtbl[i].lock);
+ spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
if (count)
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index c47bcc8be398..a774ebbf6ccb 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -32,9 +32,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
int error = -EINTR;
- write_lock(&ls->ls_recv_active);
+ write_lock_bh(&ls->ls_recv_active);
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags);
/* unblocks processes waiting to enter the dlm */
@@ -42,9 +42,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
error = 0;
}
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
- write_unlock(&ls->ls_recv_active);
+ write_unlock_bh(&ls->ls_recv_active);
return error;
}
@@ -259,12 +259,12 @@ static void do_ls_recovery(struct dlm_ls *ls)
struct dlm_recover *rv = NULL;
int error;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
rv = ls->ls_recover_args;
ls->ls_recover_args = NULL;
if (rv && ls->ls_recover_seq == rv->seq)
clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (rv) {
error = ls_recover(ls, rv);
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 8a57a2d70561..bc8f8a14ddf2 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -68,7 +68,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
int error = 0;
- write_lock(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
for (;;) {
if (list_empty(&ls->ls_requestqueue)) {
clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
@@ -97,7 +97,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
break;
}
}
- write_unlock(&ls->ls_requestqueue_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
return error;
}
@@ -132,7 +132,7 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
struct rq_entry *e, *safe;
- write_lock(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
ms = &e->request;
@@ -141,6 +141,6 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
kfree(e);
}
}
- write_unlock(&ls->ls_requestqueue_lock);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
}
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 695e691b38b3..c0d35678ee54 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -206,7 +206,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
return;
ls = lkb->lkb_resource->res_ls;
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed
@@ -228,12 +228,12 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_FAILURE:
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
WARN_ON_ONCE(1);
goto out;
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
@@ -247,19 +247,19 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
WARN_ON_ONCE(1);
break;
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
/* N.B. spin_lock locks_spin, not asts_spin */
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
if (!list_empty(&lkb->lkb_ownqueue)) {
list_del_init(&lkb->lkb_ownqueue);
dlm_put_lkb(lkb);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
}
out:
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
}
static int device_user_lock(struct dlm_user_proc *proc,
@@ -832,10 +832,10 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
return -EINVAL;
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
if (list_empty(&proc->asts)) {
if (file->f_flags & O_NONBLOCK) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return -EAGAIN;
}
@@ -844,16 +844,16 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
repeat:
set_current_state(TASK_INTERRUPTIBLE);
if (list_empty(&proc->asts) && !signal_pending(current)) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
schedule();
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
goto repeat;
}
set_current_state(TASK_RUNNING);
remove_wait_queue(&proc->wait, &wait);
if (signal_pending(current)) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return -ERESTARTSYS;
}
}
@@ -875,7 +875,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
*/
log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
list_del_init(&lkb->lkb_cb_list);
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
/* removes ref for proc->asts, may cause lkb to be freed */
dlm_put_lkb(lkb);
WARN_ON_ONCE(1);
@@ -890,7 +890,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
WARN_ON_ONCE(1);
break;
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode);
@@ -925,12 +925,12 @@ static __poll_t device_poll(struct file *file, poll_table *wait)
poll_wait(file, &proc->wait, wait);
- spin_lock(&proc->asts_spin);
+ spin_lock_bh(&proc->asts_spin);
if (!list_empty(&proc->asts)) {
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return EPOLLIN | EPOLLRDNORM;
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
return 0;
}
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread