* [Cluster-devel] [RFC PATCH dlm/next 0/2] fs: dlm: remove reverse locking order
@ 2021-10-04 20:58 Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 1/2] fs: dlm: provide some lockless functionality Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 2/2] fs: dlm: change dlm logic locking behaviour Alexander Aring
0 siblings, 2 replies; 3+ messages in thread
From: Alexander Aring @ 2021-10-04 20:58 UTC (permalink / raw)
To: cluster-devel.redhat.com
Hi,
This patch series is another try to remove the ls_waiter_mutex and try to
make some locking behaviour in case of dlm_recover_waiters_pre().
Instead of earlier using the res_mutex to as a lock for some lkb fields,
this patch introduces a lkb lock, so it is less complex to keep track of
all locks which are needed to held.
- Alex
Alexander Aring (2):
fs: dlm: provide some lockless functionality
fs: dlm: change dlm logic locking behaviour
fs/dlm/debug_fs.c | 7 +-
fs/dlm/dlm_internal.h | 5 +-
fs/dlm/lock.c | 256 ++++++++++++++++++++++++++----------------
fs/dlm/lock.h | 10 ++
fs/dlm/lockspace.c | 9 +-
fs/dlm/recover.c | 5 +
6 files changed, 192 insertions(+), 100 deletions(-)
--
2.27.0
^ permalink raw reply [flat|nested] 3+ messages in thread
* [Cluster-devel] [RFC PATCH dlm/next 1/2] fs: dlm: provide some lockless functionality
2021-10-04 20:58 [Cluster-devel] [RFC PATCH dlm/next 0/2] fs: dlm: remove reverse locking order Alexander Aring
@ 2021-10-04 20:58 ` Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 2/2] fs: dlm: change dlm logic locking behaviour Alexander Aring
1 sibling, 0 replies; 3+ messages in thread
From: Alexander Aring @ 2021-10-04 20:58 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch provides some lockless functionality for the central dlm
locking logic. It can be used internally to call central dlm locking
logic functionality if the necessary resource lock is already held.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 72 ++++++++++++++++++++++++++++++++++-----------------
1 file changed, 48 insertions(+), 24 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 3d4eeab213da..c419a5b5b7c0 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -4641,25 +4641,33 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
}
-static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void _receive_convert_reply_lockless(struct dlm_lkb *lkb,
+ struct dlm_message *ms,
+ struct dlm_rsb *r)
{
- struct dlm_rsb *r = lkb->lkb_resource;
int error;
- hold_rsb(r);
- lock_rsb(r);
-
error = validate_message(lkb, ms);
if (error)
- goto out;
+ return;
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
if (error)
- goto out;
+ return;
__receive_convert_reply(r, lkb, ms);
- out:
+}
+
+static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+ struct dlm_rsb *r = lkb->lkb_resource;
+
+ hold_rsb(r);
+ lock_rsb(r);
+
+ _receive_convert_reply_lockless(lkb, ms, r);
+
unlock_rsb(r);
put_rsb(r);
}
@@ -4678,22 +4686,20 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
}
-static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void _receive_unlock_reply_lockless(struct dlm_lkb *lkb,
+ struct dlm_message *ms,
+ struct dlm_rsb *r)
{
- struct dlm_rsb *r = lkb->lkb_resource;
int error;
- hold_rsb(r);
- lock_rsb(r);
-
error = validate_message(lkb, ms);
if (error)
- goto out;
+ return;
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
if (error)
- goto out;
+ return;
/* this is the value returned from do_unlock() on the master */
@@ -4709,7 +4715,17 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
log_error(r->res_ls, "receive_unlock_reply %x error %d",
lkb->lkb_id, ms->m_result);
}
- out:
+}
+
+static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+ struct dlm_rsb *r = lkb->lkb_resource;
+
+ hold_rsb(r);
+ lock_rsb(r);
+
+ _receive_unlock_reply_lockless(lkb, ms, r);
+
unlock_rsb(r);
put_rsb(r);
}
@@ -4728,22 +4744,20 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
}
-static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void _receive_cancel_reply_lockless(struct dlm_lkb *lkb,
+ struct dlm_message *ms,
+ struct dlm_rsb *r)
{
- struct dlm_rsb *r = lkb->lkb_resource;
int error;
- hold_rsb(r);
- lock_rsb(r);
-
error = validate_message(lkb, ms);
if (error)
- goto out;
+ return;
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
if (error)
- goto out;
+ return;
/* this is the value returned from do_cancel() on the master */
@@ -4759,7 +4773,17 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
log_error(r->res_ls, "receive_cancel_reply %x error %d",
lkb->lkb_id, ms->m_result);
}
- out:
+}
+
+static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+{
+ struct dlm_rsb *r = lkb->lkb_resource;
+
+ hold_rsb(r);
+ lock_rsb(r);
+
+ _receive_cancel_reply_lockless(lkb, ms, r);
+
unlock_rsb(r);
put_rsb(r);
}
--
2.27.0
^ permalink raw reply related [flat|nested] 3+ messages in thread
* [Cluster-devel] [RFC PATCH dlm/next 2/2] fs: dlm: change dlm logic locking behaviour
2021-10-04 20:58 [Cluster-devel] [RFC PATCH dlm/next 0/2] fs: dlm: remove reverse locking order Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 1/2] fs: dlm: provide some lockless functionality Alexander Aring
@ 2021-10-04 20:58 ` Alexander Aring
1 sibling, 0 replies; 3+ messages in thread
From: Alexander Aring @ 2021-10-04 20:58 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch will try to make some lock behaviour better inside the central
dlm locking logic functionality. This behaviour is that ls_waiter_mutex
and res_mutex is sometimes locked in a reverse order. The reverse order
can happen between the locking functionality dlm_lock()/dlm_unlock() and
recovery handling dlm_recover_waiters_pre().
The reverse order is not the only problem. DLM has a internal flag to
signal a different lock order between ls_waiter_mutex and res_mutex by
setting the DLM_IFL_STUB_MS flag so the ls_waiter_mutex will not get
held because it is already hold in case of dlm_recover_waiters_pre().
To somehow unwind this behaviour we remove the ls_waiter_mutex and
replace it with rcu handling. To lock anything else while central dlm
locking logic functionality is running concurrent this patch introduces
the lkb_lock and it's lock_lkb() and unlock_lkb() functionality similar
like the existing rsb lock.
Being able to not holding the ls_waiter_mutex before iterate ls_waiters
list allows us to never run a different lock order by using internal
lockless central dlm locking logic functionality and hold the requiered
locks lkb/rsb lock in each iteration before call dlm locking functionality.
As benefit we move the per lockspace ls_waiter_mutex to a per lkb lock.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/debug_fs.c | 7 +-
fs/dlm/dlm_internal.h | 5 +-
fs/dlm/lock.c | 190 ++++++++++++++++++++++++++----------------
fs/dlm/lock.h | 10 +++
fs/dlm/lockspace.c | 9 +-
fs/dlm/recover.c | 5 ++
6 files changed, 147 insertions(+), 79 deletions(-)
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 47e9d57e4cae..d51da780ea12 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -704,12 +704,13 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
struct dlm_ls *ls = file->private_data;
struct dlm_lkb *lkb;
size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
+ int idx;
mutex_lock(&debug_buf_lock);
- mutex_lock(&ls->ls_waiters_mutex);
memset(debug_buf, 0, sizeof(debug_buf));
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
ret = snprintf(debug_buf + pos, len - pos, "%x %d %d %s\n",
lkb->lkb_id, lkb->lkb_wait_type,
lkb->lkb_nodeid, lkb->lkb_resource->res_name);
@@ -717,7 +718,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
break;
pos += ret;
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
mutex_unlock(&debug_buf_lock);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3fb610dfea19..4ac0d513d307 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -279,6 +279,8 @@ struct dlm_lkb {
void *lkb_astparam; /* caller's ast arg */
struct dlm_user_args *lkb_ua;
};
+ struct mutex lkb_mutex;
+ struct rcu_head rcu;
};
/*
@@ -568,7 +570,8 @@ struct dlm_ls {
struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size;
- struct mutex ls_waiters_mutex;
+ struct srcu_struct ls_lkb_srcu;
+ spinlock_t ls_waiters_lock;
struct list_head ls_waiters; /* lkbs needing a reply */
struct mutex ls_orphans_mutex;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c419a5b5b7c0..78c7addcb3c8 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1199,6 +1199,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
INIT_LIST_HEAD(&lkb->lkb_cb_list);
mutex_init(&lkb->lkb_cb_mutex);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
+ mutex_init(&lkb->lkb_mutex);
idr_preload(GFP_NOFS);
spin_lock(&ls->ls_lkbidr_spin);
@@ -1242,6 +1243,20 @@ static void kill_lkb(struct kref *kref)
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
}
+
+static void _dlm_free_lkb(struct rcu_head *rcu)
+{
+ struct dlm_lkb *lkb = container_of(rcu, struct dlm_lkb, rcu);
+
+ detach_lkb(lkb);
+
+ /* for local/process lkbs, lvbptr points to caller's lksb */
+ if (lkb->lkb_lvbptr && is_master_copy(lkb))
+ dlm_free_lvb(lkb->lkb_lvbptr);
+
+ dlm_free_lkb(lkb);
+}
+
/* __put_lkb() is used when an lkb may not have an rsb attached to
it so we need to provide the lockspace explicitly */
@@ -1254,12 +1269,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
idr_remove(&ls->ls_lkbidr, lkid);
spin_unlock(&ls->ls_lkbidr_spin);
- detach_lkb(lkb);
-
- /* for local/process lkbs, lvbptr points to caller's lksb */
- if (lkb->lkb_lvbptr && is_master_copy(lkb))
- dlm_free_lvb(lkb->lkb_lvbptr);
- dlm_free_lkb(lkb);
+ call_srcu(&ls->ls_lkb_srcu, &lkb->rcu, _dlm_free_lkb);
return 1;
} else {
spin_unlock(&ls->ls_lkbidr_spin);
@@ -1402,22 +1412,27 @@ void dlm_scan_waiters(struct dlm_ls *ls)
u32 debug_expired = 0;
int num_nodes = 0;
int *warned = NULL;
+ int idx;
if (!dlm_config.ci_waitwarn_us)
return;
- mutex_lock(&ls->ls_waiters_mutex);
-
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
- if (!lkb->lkb_wait_time)
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
+ if (!lkb->lkb_wait_time) {
+ unlock_lkb(lkb);
continue;
+ }
debug_scanned++;
us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
- if (us < dlm_config.ci_waitwarn_us)
+ if (us < dlm_config.ci_waitwarn_us) {
+ unlock_lkb(lkb);
continue;
+ }
lkb->lkb_wait_time = 0;
@@ -1429,16 +1444,21 @@ void dlm_scan_waiters(struct dlm_ls *ls)
num_nodes = ls->ls_num_nodes;
warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
}
- if (!warned)
+ if (!warned) {
+ unlock_lkb(lkb);
continue;
- if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
+ }
+ if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) {
+ unlock_lkb(lkb);
continue;
+ }
log_error(ls, "waitwarn %x %lld %d us check connection to "
"node %d", lkb->lkb_id, (long long)us,
dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
+ unlock_lkb(lkb);
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
kfree(warned);
if (debug_expired)
@@ -1455,8 +1475,6 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error = 0;
- mutex_lock(&ls->ls_waiters_mutex);
-
if (is_overlap_unlock(lkb) ||
(is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
error = -EINVAL;
@@ -1493,13 +1511,14 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
lkb->lkb_wait_time = ktime_get();
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
hold_lkb(lkb);
- list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
+ spin_lock(&ls->ls_waiters_lock);
+ list_add_rcu(&lkb->lkb_wait_reply, &ls->ls_waiters);
+ spin_unlock(&ls->ls_waiters_lock);
out:
if (error)
log_error(ls, "addwait error %x %d flags %x %d %d %s",
lkb->lkb_id, error, lkb->lkb_flags, mstype,
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
- mutex_unlock(&ls->ls_waiters_mutex);
return error;
}
@@ -1587,37 +1606,23 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
lkb->lkb_flags &= ~DLM_IFL_RESEND;
lkb->lkb_wait_count--;
- if (!lkb->lkb_wait_count)
- list_del_init(&lkb->lkb_wait_reply);
+ if (!lkb->lkb_wait_count) {
+ spin_lock(&ls->ls_waiters_lock);
+ list_del_rcu(&lkb->lkb_wait_reply);
+ spin_unlock(&ls->ls_waiters_lock);
+ }
unhold_lkb(lkb);
return 0;
}
static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
{
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- int error;
-
- mutex_lock(&ls->ls_waiters_mutex);
- error = _remove_from_waiters(lkb, mstype, NULL);
- mutex_unlock(&ls->ls_waiters_mutex);
- return error;
+ return _remove_from_waiters(lkb, mstype, NULL);
}
-/* Handles situations where we might be processing a "fake" or "stub" reply in
- which we can't try to take waiters_mutex again. */
-
static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- int error;
-
- if (ms->m_flags != DLM_IFL_STUB_MS)
- mutex_lock(&ls->ls_waiters_mutex);
- error = _remove_from_waiters(lkb, ms->m_type, ms);
- if (ms->m_flags != DLM_IFL_STUB_MS)
- mutex_unlock(&ls->ls_waiters_mutex);
- return error;
+ return _remove_from_waiters(lkb, ms->m_type, ms);
}
/* If there's an rsb for the same resource being removed, ensure
@@ -1925,6 +1930,7 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
+ int idx;
ls->ls_recover_begin = 0;
mutex_lock(&ls->ls_timeout_mutex);
@@ -1935,12 +1941,14 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
if (!dlm_config.ci_waitwarn_us)
return;
- mutex_lock(&ls->ls_waiters_mutex);
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
if (ktime_to_us(lkb->lkb_wait_time))
lkb->lkb_wait_time = ktime_get();
+ unlock_lkb(lkb);
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
}
/* lkb is master or local copy */
@@ -3328,6 +3336,7 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
if (error)
return error;
+ lock_lkb(lkb);
lock_rsb(r);
attach_lkb(r, lkb);
@@ -3337,6 +3346,7 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -3346,6 +3356,7 @@ static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_rsb *r;
int error;
+ lock_lkb(lkb);
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3359,6 +3370,7 @@ static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -3368,6 +3380,7 @@ static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_rsb *r;
int error;
+ lock_lkb(lkb);
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3381,6 +3394,7 @@ static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -3390,6 +3404,7 @@ static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_rsb *r;
int error;
+ lock_lkb(lkb);
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3403,6 +3418,7 @@ static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
return error;
}
@@ -4661,7 +4677,10 @@ static void _receive_convert_reply_lockless(struct dlm_lkb *lkb,
static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_rsb *r = lkb->lkb_resource;
+ struct dlm_rsb *r;
+
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
@@ -4670,6 +4689,7 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
}
static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4719,7 +4739,10 @@ static void _receive_unlock_reply_lockless(struct dlm_lkb *lkb,
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_rsb *r = lkb->lkb_resource;
+ struct dlm_rsb *r;
+
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
@@ -4728,6 +4751,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
}
static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4777,7 +4801,10 @@ static void _receive_cancel_reply_lockless(struct dlm_lkb *lkb,
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{
- struct dlm_rsb *r = lkb->lkb_resource;
+ struct dlm_rsb *r;
+
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
@@ -4786,6 +4813,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
}
static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4818,7 +4846,9 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
/* ms->m_result is the value returned by dlm_master_lookup on dir node
FIXME: will a non-zero error ever be returned? */
+ lock_lkb(lkb);
r = lkb->lkb_resource;
+
hold_rsb(r);
lock_rsb(r);
@@ -4877,6 +4907,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
out:
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
dlm_put_lkb(lkb);
}
@@ -5096,7 +5127,8 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
}
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
- struct dlm_message *ms_stub)
+ struct dlm_message *ms_stub,
+ struct dlm_rsb *r)
{
if (middle_conversion(lkb)) {
hold_lkb(lkb);
@@ -5105,11 +5137,11 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
ms_stub->m_result = -EINPROGRESS;
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
- _receive_convert_reply(lkb, ms_stub);
+ _receive_convert_reply_lockless(lkb, ms_stub, r);
/* Same special case as in receive_rcom_lock_args() */
lkb->lkb_grmode = DLM_LOCK_IV;
- rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
+ rsb_set_flag(r, RSB_RECOVER_CONVERT);
unhold_lkb(lkb);
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
@@ -5143,28 +5175,23 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
void dlm_recover_waiters_pre(struct dlm_ls *ls)
{
- struct dlm_lkb *lkb, *safe;
struct dlm_message *ms_stub;
int wait_type, stub_unlock_result, stub_cancel_result;
- int dir_nodeid;
+ struct dlm_lkb *lkb;
+ int dir_nodeid, idx;
+ struct dlm_rsb *r;
ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
if (!ms_stub)
return;
- /* we do mutex_lock_nested() here in a different lockclass to avoid
- * a false positive report about a possible circular lock dependency.
- * The lockclass should be the same but doing it avoids the false
- * positive report. This report is about a different lock order
- * regarding to ls_waiters_mutex lock and res_mutex of lkb between here
- * and during lock operations. However lock operations cannot run in
- * parallel with dlm_recover_waiters_pre() because certain locks are
- * held which DLM_ASSERT_OPS_LOCKED(ls) is checking on.
- */
- DLM_ASSERT_OPS_LOCKED(ls);
- mutex_lock_nested(&ls->ls_waiters_mutex, 1);
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
+ r = lkb->lkb_resource;
- list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
+ hold_rsb(r);
+ lock_rsb(r);
dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
@@ -5188,11 +5215,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
lkb->lkb_flags |= DLM_IFL_RESEND;
+ unlock_rsb(r);
+ put_rsb(r);
+ unlock_lkb(lkb);
continue;
}
- if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
+ if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) {
+ unlock_rsb(r);
+ put_rsb(r);
+ unlock_lkb(lkb);
continue;
+ }
wait_type = lkb->lkb_wait_type;
stub_unlock_result = -DLM_EUNLOCK;
@@ -5227,7 +5261,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
break;
case DLM_MSG_CONVERT:
- recover_convert_waiter(ls, lkb, ms_stub);
+ recover_convert_waiter(ls, lkb, ms_stub, r);
break;
case DLM_MSG_UNLOCK:
@@ -5237,7 +5271,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
ms_stub->m_result = stub_unlock_result;
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
- _receive_unlock_reply(lkb, ms_stub);
+ _receive_unlock_reply_lockless(lkb, ms_stub, r);
dlm_put_lkb(lkb);
break;
@@ -5248,7 +5282,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
ms_stub->m_result = stub_cancel_result;
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
- _receive_cancel_reply(lkb, ms_stub);
+ _receive_cancel_reply_lockless(lkb, ms_stub, r);
dlm_put_lkb(lkb);
break;
@@ -5256,9 +5290,13 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
log_error(ls, "invalid lkb wait_type %d %d",
lkb->lkb_wait_type, wait_type);
}
+ unlock_rsb(r);
+ put_rsb(r);
+ unlock_lkb(lkb);
schedule();
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
+
kfree(ms_stub);
}
@@ -5267,15 +5305,15 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
struct dlm_lkb *lkb;
int found = 0;
- mutex_lock(&ls->ls_waiters_mutex);
- list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+ lock_lkb(lkb);
if (lkb->lkb_flags & DLM_IFL_RESEND) {
hold_lkb(lkb);
found = 1;
break;
}
+ unlock_lkb(lkb);
}
- mutex_unlock(&ls->ls_waiters_mutex);
if (!found)
lkb = NULL;
@@ -5302,8 +5340,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
- int error = 0, mstype, err, oc, ou;
+ int error = 0, mstype, err, oc, ou, idx;
+ idx = srcu_read_lock(&ls->ls_lkb_srcu);
while (1) {
if (dlm_locking_stopped(ls)) {
log_debug(ls, "recover_waiters_post aborted");
@@ -5339,9 +5378,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
lkb->lkb_wait_type = 0;
lkb->lkb_wait_count = 0;
- mutex_lock(&ls->ls_waiters_mutex);
- list_del_init(&lkb->lkb_wait_reply);
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_lock(&ls->ls_waiters_lock);
+ list_del_rcu(&lkb->lkb_wait_reply);
+ spin_unlock(&ls->ls_waiters_lock);
unhold_lkb(lkb); /* for waiters list */
if (oc || ou) {
@@ -5385,11 +5424,14 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
"dir_nodeid %d overlap %d %d",
lkb->lkb_id, mstype, r->res_nodeid,
dlm_dir_nodeid(r), oc, ou);
+ unlock_lkb(lkb);
}
unlock_rsb(r);
put_rsb(r);
+ unlock_lkb(lkb);
dlm_put_lkb(lkb);
}
+ srcu_read_unlock(&ls->ls_lkb_srcu, idx);
return error;
}
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 456c6ec3ef6f..8029aa96e47c 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -74,5 +74,15 @@ static inline void unlock_rsb(struct dlm_rsb *r)
mutex_unlock(&r->res_mutex);
}
+static inline void lock_lkb(struct dlm_lkb *lkb)
+{
+ mutex_lock(&lkb->lkb_mutex);
+}
+
+static inline void unlock_lkb(struct dlm_lkb *lkb)
+{
+ mutex_unlock(&lkb->lkb_mutex);
+}
+
#endif
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 2896f96cf2d3..e1199c41301f 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -524,7 +524,8 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_lkbidr_spin);
INIT_LIST_HEAD(&ls->ls_waiters);
- mutex_init(&ls->ls_waiters_mutex);
+ init_srcu_struct(&ls->ls_lkb_srcu);
+ spin_lock_init(&ls->ls_waiters_lock);
INIT_LIST_HEAD(&ls->ls_orphans);
mutex_init(&ls->ls_orphans_mutex);
INIT_LIST_HEAD(&ls->ls_timeout);
@@ -810,6 +811,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
+ /*
+ * wait all readers of ls_waiters and srcu callers are left before free
+ */
+ srcu_barrier(&ls->ls_lkb_srcu);
+ cleanup_srcu_struct(&ls->ls_lkb_srcu);
+
/*
* Free all lkb's in idr
*/
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 8928e99dfd47..ed5a248af641 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -939,6 +939,11 @@ void dlm_clear_toss(struct dlm_ls *ls)
unsigned int count = 0;
int i;
+ /* _dlm_free_lkb() called by ls_lkb_srcu will clear toss as well,
+ * so call srcu_barrier() to be sure all pending srcu are done
+ */
+ srcu_barrier(&ls->ls_lkb_srcu);
+
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
--
2.27.0
^ permalink raw reply related [flat|nested] 3+ messages in thread
end of thread, other threads:[~2021-10-04 20:58 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-10-04 20:58 [Cluster-devel] [RFC PATCH dlm/next 0/2] fs: dlm: remove reverse locking order Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 1/2] fs: dlm: provide some lockless functionality Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 2/2] fs: dlm: change dlm logic locking behaviour Alexander Aring
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).