* [Cluster-devel] [RFC PATCH dlm/next 2/2] fs: dlm: change dlm logic locking behaviour
2021-10-04 20:58 [Cluster-devel] [RFC PATCH dlm/next 0/2] fs: dlm: remove reverse locking order Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 1/2] fs: dlm: provide some lockless functionality Alexander Aring
@ 2021-10-04 20:58 ` Alexander Aring
1 sibling, 0 replies; 3+ messages in thread
From: Alexander Aring @ 2021-10-04 20:58 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch will try to make some lock behaviour better inside the central
dlm locking logic functionality. This behaviour is that ls_waiter_mutex
and res_mutex is sometimes locked in a reverse order. The reverse order
can happen between the locking functionality dlm_lock()/dlm_unlock() and
recovery handling dlm_recover_waiters_pre().
The reverse order is not the only problem. DLM has a internal flag to
signal a different lock order between ls_waiter_mutex and res_mutex by
setting the DLM_IFL_STUB_MS flag so the ls_waiter_mutex will not get
held because it is already hold in case of dlm_recover_waiters_pre().
To somehow unwind this behaviour we remove the ls_waiter_mutex and
replace it with rcu handling. To lock anything else while central dlm
locking logic functionality is running concurrent this patch introduces
the lkb_lock and it's lock_lkb() and unlock_lkb() functionality similar
like the existing rsb lock.
Being able to not holding the ls_waiter_mutex before iterate ls_waiters
list allows us to never run a different lock order by using internal
lockless central dlm locking logic functionality and hold the requiered
locks lkb/rsb lock in each iteration before call dlm locking functionality.
As benefit we move the per lockspace ls_waiter_mutex to a per lkb lock.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/debug_fs.c | 7 +-
fs/dlm/dlm_internal.h | 5 +-
fs/dlm/lock.c | 190 ++++++++++++++++++++++++++----------------
fs/dlm/lock.h | 10 +++
fs/dlm/lockspace.c | 9 +-
fs/dlm/recover.c | 5 ++
6 files changed, 147 insertions(+), 79 deletions(-)
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 47e9d57e4cae..d51da780ea12 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -704,12 +704,13 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
struct dlm_ls *ls = file->private_data;
struct dlm_lkb *lkb;
size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
+ int idx;
mutex_lock(&debug_buf_lock);
- mutex_lock(&ls->ls_waiters_mutex);
memset(debug_buf, 0, sizeof(debug_buf));
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
ret = snprintf(debug_buf + pos, len - pos, "%x %d %d %s\n",
lkb->lkb_id, lkb->lkb_wait_type,
lkb->lkb_nodeid, lkb->lkb_resource->res_name);
@@ -717,7 +718,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
break;
pos += ret;
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
mutex_unlock(&debug_buf_lock);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3fb610dfea19..4ac0d513d307 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -279,6 +279,8 @@ struct dlm_lkb {
void *lkb_astparam; /* caller's ast arg */
struct dlm_user_args *lkb_ua;
};
+ struct mutex lkb_mutex;
+ struct rcu_head rcu;
};
/*
@@ -568,7 +570,8 @@ struct dlm_ls {
struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size;
- struct mutex ls_waiters_mutex;
+ struct srcu_struct ls_lkb_srcu;
+ spinlock_t ls_waiters_lock;
struct list_head ls_waiters; /* lkbs needing a reply */
struct mutex ls_orphans_mutex;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c419a5b5b7c0..78c7addcb3c8 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1199,6 +1199,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
INIT_LIST_HEAD(&lkb->lkb_cb_list);
mutex_init(&lkb->lkb_cb_mutex);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
+ mutex_init(&lkb->lkb_mutex);
idr_preload(GFP_NOFS);
spin_lock(&ls->ls_lkbidr_spin);
@@ -1242,6 +1243,20 @@ static void kill_lkb(struct kref *kref)
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
}
+
+static void _dlm_free_lkb(struct rcu_head *rcu)
+{
+ struct dlm_lkb *lkb = container_of(rcu, struct dlm_lkb, rcu);
+
+ detach_lkb(lkb);
+
+ /* for local/process lkbs, lvbptr points to caller's lksb */
+ if (lkb->lkb_lvbptr && is_master_copy(lkb))
+ dlm_free_lvb(lkb->lkb_lvbptr);
+
+ dlm_free_lkb(lkb);
+}
+
/* __put_lkb() is used when an lkb may not have an rsb attached to
it so we need to provide the lockspace explicitly */
@@ -1254,12 +1269,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
idr_remove(&ls->ls_lkbidr, lkid);
spin_unlock(&ls->ls_lkbidr_spin);
- detach_lkb(lkb);
-
- /* for local/process lkbs, lvbptr points to caller's lksb */
- if (lkb->lkb_lvbptr && is_master_copy(lkb))
- dlm_free_lvb(lkb->lkb_lvbptr);
- dlm_free_lkb(lkb);
+ call_srcu(&ls->ls_lkb_srcu, &lkb->rcu, _dlm_free_lkb);
return 1;
} else {
spin_unlock(&ls->ls_lkbidr_spin);
@@ -1402,22 +1412,27 @@ void dlm_scan_waiters(struct dlm_ls *ls)
u32 debug_expired = 0;
int num_nodes = 0;
int *warned = NULL;
+ int idx;
if (!dlm_config.ci_waitwarn_us)
return;
- mutex_lock(&ls->ls_waiters_mutex);
-
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
- if (!lkb->lkb_wait_time)
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
+ if (!lkb->lkb_wait_time) {
+ unlock_lkb(lkb);
continue;
+ }
debug_scanned++;
us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
- if (us < dlm_config.ci_waitwarn_us)
+ if (us < dlm_config.ci_waitwarn_us) {
+ unlock_lkb(lkb);
continue;
+ }
lkb->lkb_wait_time = 0;
@@ -1429,16 +1444,21 @@ void dlm_scan_waiters(struct dlm_ls *ls)
num_nodes = ls->ls_num_nodes;
warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
}
- if (!warned)
+ if (!warned) {
+ unlock_lkb(lkb);
continue;
- if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
+ }
+ if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) {
+ unlock_lkb(lkb);
continue;
+ }
log_error(ls, "waitwarn %x %lld %d us check connection to "
"node %d", lkb->lkb_id, (long long)us,
dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
+ unlock_lkb(lkb);
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
kfree(warned);
if (debug_expired)
@@ -1455,8 +1475,6 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error = 0;
- mutex_lock(&ls->ls_waiters_mutex);
-
if (is_overlap_unlock(lkb) ||
(is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
error = -EINVAL;
@@ -1493,13 +1511,14 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
lkb->lkb_wait_time = ktime_get();
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
hold_lkb(lkb);
- list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
+ spin_lock(&ls->ls_waiters_lock);
+ list_add_rcu(&lkb->lkb_wait_reply, &ls->ls_waiters);
+ spin_unlock(&ls->ls_waiters_lock);
out:
if (error)
log_error(ls, "addwait error %x %d flags %x %d %d %s",
lkb->lkb_id, error, lkb->lkb_flags, mstype,
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
- mutex_unlock(&ls->ls_waiters_mutex);
return error;
}
@@ -1587,37 +1606,23 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
lkb->lkb_flags &= ~DLM_IFL_RESEND;
lkb->lkb_wait_count--;
- if (!lkb->lkb_wait_count)
- list_del_init(&lkb->lkb_wait_reply);
+ if (!lkb->lkb_wait_count) {
+ spin_lock(&ls->ls_waiters_lock);
+ list_del_rcu(&lkb->lkb_wait_reply);
+ spin_unlock(&ls->ls_waiters_lock);
+ }
unhold_lkb(lkb);
return 0;
}
static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
{
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- int error;
-
- mutex_lock(&ls->ls_waiters_mutex);
- error = _remove_from_waiters(lkb, mstype, NULL);
- mutex_unlock(&ls->ls_waiters_mutex);
- return error;
+ return _remove_from_waiters(lkb, mstype, NULL);
}
-/* Handles situations where we might be processing a "fake" or "stub" reply in
- which we can't try to take waiters_mutex again. */
-
static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- int error;
-
- if (ms->m_flags != DLM_IFL_STUB_MS)
- mutex_lock(&ls->ls_waiters_mutex);
- error = _remove_from_waiters(lkb, ms->m_type, ms);
- if (ms->m_flags != DLM_IFL_STUB_MS)
- mutex_unlock(&ls->ls_waiters_mutex);
- return error;
+ return _remove_from_waiters(lkb, ms->m_type, ms);
}
/* If there's an rsb for the same resource being removed, ensure
@@ -1925,6 +1930,7 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
+ int idx;
ls->ls_recover_begin = 0;
mutex_lock(&ls->ls_timeout_mutex);
@@ -1935,12 +1941,14 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
if (!dlm_config.ci_waitwarn_us)
return;
- mutex_lock(&ls->ls_waiters_mutex);
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
if (ktime_to_us(lkb->lkb_wait_time))
lkb->lkb_wait_time = ktime_get();
+ unlock_lkb(lkb);
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
}
/* lkb is master or local copy */
@@ -3328,6 +3336,7 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
if (error)
return error;
+ lock_lkb(lkb);
lock_rsb(r);
attach_lkb(r, lkb);
@@ -3337,6 +3346,7 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -3346,6 +3356,7 @@ static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_rsb *r;
int error;
+ lock_lkb(lkb);
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3359,6 +3370,7 @@ static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -3368,6 +3380,7 @@ static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_rsb *r;
int error;
+ lock_lkb(lkb);
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3381,6 +3394,7 @@ static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -3390,6 +3404,7 @@ static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_rsb *r;
int error;
+ lock_lkb(lkb);
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3403,6 +3418,7 @@ static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -4661,7 +4677,10 @@ static void _receive_convert_reply_lockless(struct dlm_lkb *lkb,
static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_rsb *r = lkb->lkb_resource;
+ struct dlm_rsb *r;
+
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
@@ -4670,6 +4689,7 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
}
static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4719,7 +4739,10 @@ static void _receive_unlock_reply_lockless(struct dlm_lkb *lkb,
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_rsb *r = lkb->lkb_resource;
+ struct dlm_rsb *r;
+
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
@@ -4728,6 +4751,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
}
static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4777,7 +4801,10 @@ static void _receive_cancel_reply_lockless(struct dlm_lkb *lkb,
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_rsb *r = lkb->lkb_resource;
+ struct dlm_rsb *r;
+
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
@@ -4786,6 +4813,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
}
static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4818,7 +4846,9 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
/* ms->m_result is the value returned by dlm_master_lookup on dir node
FIXME: will a non-zero error ever be returned? */
+ lock_lkb(lkb);
r = lkb->lkb_resource;
+
hold_rsb(r);
lock_rsb(r);
@@ -4877,6 +4907,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
dlm_put_lkb(lkb);
}
@@ -5096,7 +5127,8 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
}
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms_stub)
+ struct dlm_message *ms_stub,
+ struct dlm_rsb *r)
{
if (middle_conversion(lkb)) {
hold_lkb(lkb);
@@ -5105,11 +5137,11 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
ms_stub->m_result = -EINPROGRESS;
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
- _receive_convert_reply(lkb, ms_stub);
+ _receive_convert_reply_lockless(lkb, ms_stub, r);
/* Same special case as in receive_rcom_lock_args() */
lkb->lkb_grmode = DLM_LOCK_IV;
- rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
+ rsb_set_flag(r, RSB_RECOVER_CONVERT);
unhold_lkb(lkb);
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
@@ -5143,28 +5175,23 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
void dlm_recover_waiters_pre(struct dlm_ls *ls)
{
- struct dlm_lkb *lkb, *safe;
struct dlm_message *ms_stub;
int wait_type, stub_unlock_result, stub_cancel_result;
- int dir_nodeid;
+ struct dlm_lkb *lkb;
+ int dir_nodeid, idx;
+ struct dlm_rsb *r;
ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
if (!ms_stub)
return;
- /* we do mutex_lock_nested() here in a different lockclass to avoid
- * a false positive report about a possible circular lock dependency.
- * The lockclass should be the same but doing it avoids the false
- * positive report. This report is about a different lock order
- * regarding to ls_waiters_mutex lock and res_mutex of lkb between here
- * and during lock operations. However lock operations cannot run in
- * parallel with dlm_recover_waiters_pre() because certain locks are
- * held which DLM_ASSERT_OPS_LOCKED(ls) is checking on.
- */
- DLM_ASSERT_OPS_LOCKED(ls);
- mutex_lock_nested(&ls->ls_waiters_mutex, 1);
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
- list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
+ hold_rsb(r);
+ lock_rsb(r);
dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
@@ -5188,11 +5215,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
lkb->lkb_flags |= DLM_IFL_RESEND;
+ unlock_rsb(r);
+ put_rsb(r);
+ unlock_lkb(lkb);
continue;
}
- if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
+ if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) {
+ unlock_rsb(r);
+ put_rsb(r);
+ unlock_lkb(lkb);
continue;
+ }
wait_type = lkb->lkb_wait_type;
stub_unlock_result = -DLM_EUNLOCK;
@@ -5227,7 +5261,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
break;
case DLM_MSG_CONVERT:
- recover_convert_waiter(ls, lkb, ms_stub);
+ recover_convert_waiter(ls, lkb, ms_stub, r);
break;
case DLM_MSG_UNLOCK:
@@ -5237,7 +5271,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
ms_stub->m_result = stub_unlock_result;
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
- _receive_unlock_reply(lkb, ms_stub);
+ _receive_unlock_reply_lockless(lkb, ms_stub, r);
dlm_put_lkb(lkb);
break;
@@ -5248,7 +5282,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
ms_stub->m_result = stub_cancel_result;
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
- _receive_cancel_reply(lkb, ms_stub);
+ _receive_cancel_reply_lockless(lkb, ms_stub, r);
dlm_put_lkb(lkb);
break;
@@ -5256,9 +5290,13 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
log_error(ls, "invalid lkb wait_type %d %d",
lkb->lkb_wait_type, wait_type);
}
+ unlock_rsb(r);
+ put_rsb(r);
+ unlock_lkb(lkb);
schedule();
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
+
kfree(ms_stub);
}
@@ -5267,15 +5305,15 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
struct dlm_lkb *lkb;
int found = 0;
- mutex_lock(&ls->ls_waiters_mutex);
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
if (lkb->lkb_flags & DLM_IFL_RESEND) {
hold_lkb(lkb);
found = 1;
break;
}
+ unlock_lkb(lkb);
}
- mutex_unlock(&ls->ls_waiters_mutex);
if (!found)
lkb = NULL;
@@ -5302,8 +5340,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
- int error = 0, mstype, err, oc, ou;
+ int error = 0, mstype, err, oc, ou, idx;
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
while (1) {
if (dlm_locking_stopped(ls)) {
log_debug(ls, "recover_waiters_post aborted");
@@ -5339,9 +5378,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
lkb->lkb_wait_type = 0;
lkb->lkb_wait_count = 0;
- mutex_lock(&ls->ls_waiters_mutex);
- list_del_init(&lkb->lkb_wait_reply);
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
+ list_del_rcu(&lkb->lkb_wait_reply);
+ spin_unlock(&ls->ls_waiters_lock);
unhold_lkb(lkb); /* for waiters list */
if (oc || ou) {
@@ -5385,11 +5424,14 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
"dir_nodeid %d overlap %d %d",
lkb->lkb_id, mstype, r->res_nodeid,
dlm_dir_nodeid(r), oc, ou);
+ unlock_lkb(lkb);
}
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
dlm_put_lkb(lkb);
}
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
return error;
}
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 456c6ec3ef6f..8029aa96e47c 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -74,5 +74,15 @@ static inline void unlock_rsb(struct dlm_rsb *r)
mutex_unlock(&r->res_mutex);
}
+static inline void lock_lkb(struct dlm_lkb *lkb)
+{
+ mutex_lock(&lkb->lkb_mutex);
+}
+
+static inline void unlock_lkb(struct dlm_lkb *lkb)
+{
+ mutex_unlock(&lkb->lkb_mutex);
+}
+
#endif
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 2896f96cf2d3..e1199c41301f 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -524,7 +524,8 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_lkbidr_spin);
INIT_LIST_HEAD(&ls->ls_waiters);
- mutex_init(&ls->ls_waiters_mutex);
+ init_srcu_struct(&ls->ls_lkb_srcu);
+ spin_lock_init(&ls->ls_waiters_lock);
INIT_LIST_HEAD(&ls->ls_orphans);
mutex_init(&ls->ls_orphans_mutex);
INIT_LIST_HEAD(&ls->ls_timeout);
@@ -810,6 +811,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
+ /*
+ * wait all readers of ls_waiters and srcu callers are left before free
+ */
+ srcu_barrier(&ls->ls_lkb_srcu);
+ cleanup_srcu_struct(&ls->ls_lkb_srcu);
+
/*
* Free all lkb's in idr
*/
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 8928e99dfd47..ed5a248af641 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -939,6 +939,11 @@ void dlm_clear_toss(struct dlm_ls *ls)
unsigned int count = 0;
int i;
+ /* _dlm_free_lkb() called by ls_lkb_srcu will clear toss as well,
+ * so call srcu_barrier() to be sure all pending srcu are done
+ */
+ srcu_barrier(&ls->ls_lkb_srcu);
+
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
--
2.27.0
^ permalink raw reply related [flat|nested] 3+ messages in thread