cluster-devel.redhat.com archive mirror
 help / color / mirror / Atom feed
From: Alexander Aring <aahringo@redhat.com>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] [RFC PATCH dlm/next 2/2] fs: dlm: change dlm logic locking behaviour
Date: Mon,  4 Oct 2021 16:58:05 -0400	[thread overview]
Message-ID: <20211004205805.3938460-3-aahringo@redhat.com> (raw)
In-Reply-To: <20211004205805.3938460-1-aahringo@redhat.com>

This patch will try to make some lock behaviour better inside the central
dlm locking logic functionality. This behaviour is that ls_waiter_mutex
and res_mutex is sometimes locked in a reverse order. The reverse order
can happen between the locking functionality dlm_lock()/dlm_unlock() and
recovery handling dlm_recover_waiters_pre().

The reverse order is not the only problem. DLM has a internal flag to
signal a different lock order between ls_waiter_mutex and res_mutex by
setting the DLM_IFL_STUB_MS flag so the ls_waiter_mutex will not get
held because it is already hold in case of dlm_recover_waiters_pre().

To somehow unwind this behaviour we remove the ls_waiter_mutex and
replace it with rcu handling. To lock anything else while central dlm
locking logic functionality is running concurrent this patch introduces
the lkb_lock and it's lock_lkb() and unlock_lkb() functionality similar
like the existing rsb lock.

Being able to not holding the ls_waiter_mutex before iterate ls_waiters
list allows us to never run a different lock order by using internal
lockless central dlm locking logic functionality and hold the requiered
locks lkb/rsb lock in each iteration before call dlm locking functionality.

As benefit we move the per lockspace ls_waiter_mutex to a per lkb lock.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/debug_fs.c     |   7 +-
 fs/dlm/dlm_internal.h |   5 +-
 fs/dlm/lock.c         | 190 ++++++++++++++++++++++++++----------------
 fs/dlm/lock.h         |  10 +++
 fs/dlm/lockspace.c    |   9 +-
 fs/dlm/recover.c      |   5 ++
 6 files changed, 147 insertions(+), 79 deletions(-)

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 47e9d57e4cae..d51da780ea12 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -704,12 +704,13 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 	struct dlm_ls *ls = file->private_data;
 	struct dlm_lkb *lkb;
 	size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
+	int idx;
 
 	mutex_lock(&debug_buf_lock);
-	mutex_lock(&ls->ls_waiters_mutex);
 	memset(debug_buf, 0, sizeof(debug_buf));
 
-	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+	idx = srcu_read_lock(&ls->ls_lkb_srcu);
+	list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
 		ret = snprintf(debug_buf + pos, len - pos, "%x %d %d %s\n",
 			       lkb->lkb_id, lkb->lkb_wait_type,
 			       lkb->lkb_nodeid, lkb->lkb_resource->res_name);
@@ -717,7 +718,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
 			break;
 		pos += ret;
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	srcu_read_unlock(&ls->ls_lkb_srcu, idx);
 
 	rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
 	mutex_unlock(&debug_buf_lock);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3fb610dfea19..4ac0d513d307 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -279,6 +279,8 @@ struct dlm_lkb {
 		void			*lkb_astparam;	/* caller's ast arg */
 		struct dlm_user_args	*lkb_ua;
 	};
+	struct mutex		lkb_mutex;
+	struct rcu_head		rcu;
 };
 
 /*
@@ -568,7 +570,8 @@ struct dlm_ls {
 	struct dlm_rsbtable	*ls_rsbtbl;
 	uint32_t		ls_rsbtbl_size;
 
-	struct mutex		ls_waiters_mutex;
+	struct srcu_struct	ls_lkb_srcu;
+	spinlock_t		ls_waiters_lock;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
 	struct mutex		ls_orphans_mutex;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c419a5b5b7c0..78c7addcb3c8 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1199,6 +1199,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
 	mutex_init(&lkb->lkb_cb_mutex);
 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
+	mutex_init(&lkb->lkb_mutex);
 
 	idr_preload(GFP_NOFS);
 	spin_lock(&ls->ls_lkbidr_spin);
@@ -1242,6 +1243,20 @@ static void kill_lkb(struct kref *kref)
 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 }
 
+
+static void _dlm_free_lkb(struct rcu_head *rcu)
+{
+	struct dlm_lkb *lkb = container_of(rcu, struct dlm_lkb, rcu);
+
+	detach_lkb(lkb);
+
+	/* for local/process lkbs, lvbptr points to caller's lksb */
+	if (lkb->lkb_lvbptr && is_master_copy(lkb))
+		dlm_free_lvb(lkb->lkb_lvbptr);
+
+	dlm_free_lkb(lkb);
+}
+
 /* __put_lkb() is used when an lkb may not have an rsb attached to
    it so we need to provide the lockspace explicitly */
 
@@ -1254,12 +1269,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 		idr_remove(&ls->ls_lkbidr, lkid);
 		spin_unlock(&ls->ls_lkbidr_spin);
 
-		detach_lkb(lkb);
-
-		/* for local/process lkbs, lvbptr points to caller's lksb */
-		if (lkb->lkb_lvbptr && is_master_copy(lkb))
-			dlm_free_lvb(lkb->lkb_lvbptr);
-		dlm_free_lkb(lkb);
+		call_srcu(&ls->ls_lkb_srcu, &lkb->rcu, _dlm_free_lkb);
 		return 1;
 	} else {
 		spin_unlock(&ls->ls_lkbidr_spin);
@@ -1402,22 +1412,27 @@ void dlm_scan_waiters(struct dlm_ls *ls)
 	u32 debug_expired = 0;
 	int num_nodes = 0;
 	int *warned = NULL;
+	int idx;
 
 	if (!dlm_config.ci_waitwarn_us)
 		return;
 
-	mutex_lock(&ls->ls_waiters_mutex);
-
-	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
-		if (!lkb->lkb_wait_time)
+	idx = srcu_read_lock(&ls->ls_lkb_srcu);
+	list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+		lock_lkb(lkb);
+		if (!lkb->lkb_wait_time) {
+			unlock_lkb(lkb);
 			continue;
+		}
 
 		debug_scanned++;
 
 		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
 
-		if (us < dlm_config.ci_waitwarn_us)
+		if (us < dlm_config.ci_waitwarn_us) {
+			unlock_lkb(lkb);
 			continue;
+		}
 
 		lkb->lkb_wait_time = 0;
 
@@ -1429,16 +1444,21 @@ void dlm_scan_waiters(struct dlm_ls *ls)
 			num_nodes = ls->ls_num_nodes;
 			warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
 		}
-		if (!warned)
+		if (!warned) {
+			unlock_lkb(lkb);
 			continue;
-		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
+		}
+		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) {
+			unlock_lkb(lkb);
 			continue;
+		}
 
 		log_error(ls, "waitwarn %x %lld %d us check connection to "
 			  "node %d", lkb->lkb_id, (long long)us,
 			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
+		unlock_lkb(lkb);
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	srcu_read_unlock(&ls->ls_lkb_srcu, idx);
 	kfree(warned);
 
 	if (debug_expired)
@@ -1455,8 +1475,6 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error = 0;
 
-	mutex_lock(&ls->ls_waiters_mutex);
-
 	if (is_overlap_unlock(lkb) ||
 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
 		error = -EINVAL;
@@ -1493,13 +1511,14 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 	lkb->lkb_wait_time = ktime_get();
 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
 	hold_lkb(lkb);
-	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
+	spin_lock(&ls->ls_waiters_lock);
+	list_add_rcu(&lkb->lkb_wait_reply, &ls->ls_waiters);
+	spin_unlock(&ls->ls_waiters_lock);
  out:
 	if (error)
 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
-	mutex_unlock(&ls->ls_waiters_mutex);
 	return error;
 }
 
@@ -1587,37 +1606,23 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
 
 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
 	lkb->lkb_wait_count--;
-	if (!lkb->lkb_wait_count)
-		list_del_init(&lkb->lkb_wait_reply);
+	if (!lkb->lkb_wait_count) {
+		spin_lock(&ls->ls_waiters_lock);
+		list_del_rcu(&lkb->lkb_wait_reply);
+		spin_unlock(&ls->ls_waiters_lock);
+	}
 	unhold_lkb(lkb);
 	return 0;
 }
 
 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 {
-	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-	int error;
-
-	mutex_lock(&ls->ls_waiters_mutex);
-	error = _remove_from_waiters(lkb, mstype, NULL);
-	mutex_unlock(&ls->ls_waiters_mutex);
-	return error;
+	return _remove_from_waiters(lkb, mstype, NULL);
 }
 
-/* Handles situations where we might be processing a "fake" or "stub" reply in
-   which we can't try to take waiters_mutex again. */
-
 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 {
-	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-	int error;
-
-	if (ms->m_flags != DLM_IFL_STUB_MS)
-		mutex_lock(&ls->ls_waiters_mutex);
-	error = _remove_from_waiters(lkb, ms->m_type, ms);
-	if (ms->m_flags != DLM_IFL_STUB_MS)
-		mutex_unlock(&ls->ls_waiters_mutex);
-	return error;
+	return _remove_from_waiters(lkb, ms->m_type, ms);
 }
 
 /* If there's an rsb for the same resource being removed, ensure
@@ -1925,6 +1930,7 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
+	int idx;
 
 	ls->ls_recover_begin = 0;
 	mutex_lock(&ls->ls_timeout_mutex);
@@ -1935,12 +1941,14 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
 	if (!dlm_config.ci_waitwarn_us)
 		return;
 
-	mutex_lock(&ls->ls_waiters_mutex);
-	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+	idx = srcu_read_lock(&ls->ls_lkb_srcu);
+	list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+		lock_lkb(lkb);
 		if (ktime_to_us(lkb->lkb_wait_time))
 			lkb->lkb_wait_time = ktime_get();
+		unlock_lkb(lkb);
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	srcu_read_unlock(&ls->ls_lkb_srcu, idx);
 }
 
 /* lkb is master or local copy */
@@ -3328,6 +3336,7 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
 	if (error)
 		return error;
 
+	lock_lkb(lkb);
 	lock_rsb(r);
 
 	attach_lkb(r, lkb);
@@ -3337,6 +3346,7 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
 
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 	return error;
 }
 
@@ -3346,6 +3356,7 @@ static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
 	struct dlm_rsb *r;
 	int error;
 
+	lock_lkb(lkb);
 	r = lkb->lkb_resource;
 
 	hold_rsb(r);
@@ -3359,6 +3370,7 @@ static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
  out:
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 	return error;
 }
 
@@ -3368,6 +3380,7 @@ static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
 	struct dlm_rsb *r;
 	int error;
 
+	lock_lkb(lkb);
 	r = lkb->lkb_resource;
 
 	hold_rsb(r);
@@ -3381,6 +3394,7 @@ static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
  out:
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 	return error;
 }
 
@@ -3390,6 +3404,7 @@ static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
 	struct dlm_rsb *r;
 	int error;
 
+	lock_lkb(lkb);
 	r = lkb->lkb_resource;
 
 	hold_rsb(r);
@@ -3403,6 +3418,7 @@ static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
  out:
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 	return error;
 }
 
@@ -4661,7 +4677,10 @@ static void _receive_convert_reply_lockless(struct dlm_lkb *lkb,
 
 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 {
-	struct dlm_rsb *r = lkb->lkb_resource;
+	struct dlm_rsb *r;
+
+	lock_lkb(lkb);
+	r = lkb->lkb_resource;
 
 	hold_rsb(r);
 	lock_rsb(r);
@@ -4670,6 +4689,7 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 }
 
 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4719,7 +4739,10 @@ static void _receive_unlock_reply_lockless(struct dlm_lkb *lkb,
 
 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 {
-	struct dlm_rsb *r = lkb->lkb_resource;
+	struct dlm_rsb *r;
+
+	lock_lkb(lkb);
+	r = lkb->lkb_resource;
 
 	hold_rsb(r);
 	lock_rsb(r);
@@ -4728,6 +4751,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 }
 
 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4777,7 +4801,10 @@ static void _receive_cancel_reply_lockless(struct dlm_lkb *lkb,
 
 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 {
-	struct dlm_rsb *r = lkb->lkb_resource;
+	struct dlm_rsb *r;
+
+	lock_lkb(lkb);
+	r = lkb->lkb_resource;
 
 	hold_rsb(r);
 	lock_rsb(r);
@@ -4786,6 +4813,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
 
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 }
 
 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -4818,7 +4846,9 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
 	   FIXME: will a non-zero error ever be returned? */
 
+	lock_lkb(lkb);
 	r = lkb->lkb_resource;
+
 	hold_rsb(r);
 	lock_rsb(r);
 
@@ -4877,6 +4907,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
  out:
 	unlock_rsb(r);
 	put_rsb(r);
+	unlock_lkb(lkb);
 	dlm_put_lkb(lkb);
 }
 
@@ -5096,7 +5127,8 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
 }
 
 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
-				   struct dlm_message *ms_stub)
+				   struct dlm_message *ms_stub,
+				   struct dlm_rsb *r)
 {
 	if (middle_conversion(lkb)) {
 		hold_lkb(lkb);
@@ -5105,11 +5137,11 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
 		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
 		ms_stub->m_result = -EINPROGRESS;
 		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
-		_receive_convert_reply(lkb, ms_stub);
+		_receive_convert_reply_lockless(lkb, ms_stub, r);
 
 		/* Same special case as in receive_rcom_lock_args() */
 		lkb->lkb_grmode = DLM_LOCK_IV;
-		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
+		rsb_set_flag(r, RSB_RECOVER_CONVERT);
 		unhold_lkb(lkb);
 
 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
@@ -5143,28 +5175,23 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
 void dlm_recover_waiters_pre(struct dlm_ls *ls)
 {
-	struct dlm_lkb *lkb, *safe;
 	struct dlm_message *ms_stub;
 	int wait_type, stub_unlock_result, stub_cancel_result;
-	int dir_nodeid;
+	struct dlm_lkb *lkb;
+	int dir_nodeid, idx;
+	struct dlm_rsb *r;
 
 	ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
 	if (!ms_stub)
 		return;
 
-	/* we do mutex_lock_nested() here in a different lockclass to avoid
-	 * a false positive report about a possible circular lock dependency.
-	 * The lockclass should be the same but doing it avoids the false
-	 * positive report. This report is about a different lock order
-	 * regarding to ls_waiters_mutex lock and res_mutex of lkb between here
-	 * and during lock operations. However lock operations cannot run in
-	 * parallel with dlm_recover_waiters_pre() because certain locks are
-	 * held which DLM_ASSERT_OPS_LOCKED(ls) is checking on.
-	 */
-	DLM_ASSERT_OPS_LOCKED(ls);
-	mutex_lock_nested(&ls->ls_waiters_mutex, 1);
+	idx = srcu_read_lock(&ls->ls_lkb_srcu);
+	list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+		lock_lkb(lkb);
+		r = lkb->lkb_resource;
 
-	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
+		hold_rsb(r);
+		lock_rsb(r);
 
 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
 
@@ -5188,11 +5215,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 
 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
 			lkb->lkb_flags |= DLM_IFL_RESEND;
+			unlock_rsb(r);
+			put_rsb(r);
+			unlock_lkb(lkb);
 			continue;
 		}
 
-		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
+		if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) {
+			unlock_rsb(r);
+			put_rsb(r);
+			unlock_lkb(lkb);
 			continue;
+		}
 
 		wait_type = lkb->lkb_wait_type;
 		stub_unlock_result = -DLM_EUNLOCK;
@@ -5227,7 +5261,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			break;
 
 		case DLM_MSG_CONVERT:
-			recover_convert_waiter(ls, lkb, ms_stub);
+			recover_convert_waiter(ls, lkb, ms_stub, r);
 			break;
 
 		case DLM_MSG_UNLOCK:
@@ -5237,7 +5271,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
 			ms_stub->m_result = stub_unlock_result;
 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
-			_receive_unlock_reply(lkb, ms_stub);
+			_receive_unlock_reply_lockless(lkb, ms_stub, r);
 			dlm_put_lkb(lkb);
 			break;
 
@@ -5248,7 +5282,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
 			ms_stub->m_result = stub_cancel_result;
 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
-			_receive_cancel_reply(lkb, ms_stub);
+			_receive_cancel_reply_lockless(lkb, ms_stub, r);
 			dlm_put_lkb(lkb);
 			break;
 
@@ -5256,9 +5290,13 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 			log_error(ls, "invalid lkb wait_type %d %d",
 				  lkb->lkb_wait_type, wait_type);
 		}
+		unlock_rsb(r);
+		put_rsb(r);
+		unlock_lkb(lkb);
 		schedule();
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
+	srcu_read_unlock(&ls->ls_lkb_srcu, idx);
+
 	kfree(ms_stub);
 }
 
@@ -5267,15 +5305,15 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 	struct dlm_lkb *lkb;
 	int found = 0;
 
-	mutex_lock(&ls->ls_waiters_mutex);
-	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+	list_for_each_entry_rcu(lkb, &ls->ls_waiters, lkb_wait_reply) {
+		lock_lkb(lkb);
 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
 			hold_lkb(lkb);
 			found = 1;
 			break;
 		}
+		unlock_lkb(lkb);
 	}
-	mutex_unlock(&ls->ls_waiters_mutex);
 
 	if (!found)
 		lkb = NULL;
@@ -5302,8 +5340,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
-	int error = 0, mstype, err, oc, ou;
+	int error = 0, mstype, err, oc, ou, idx;
 
+	idx = srcu_read_lock(&ls->ls_lkb_srcu);
 	while (1) {
 		if (dlm_locking_stopped(ls)) {
 			log_debug(ls, "recover_waiters_post aborted");
@@ -5339,9 +5378,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 		lkb->lkb_wait_type = 0;
 		lkb->lkb_wait_count = 0;
-		mutex_lock(&ls->ls_waiters_mutex);
-		list_del_init(&lkb->lkb_wait_reply);
-		mutex_unlock(&ls->ls_waiters_mutex);
+		spin_lock(&ls->ls_waiters_lock);
+		list_del_rcu(&lkb->lkb_wait_reply);
+		spin_unlock(&ls->ls_waiters_lock);
 		unhold_lkb(lkb); /* for waiters list */
 
 		if (oc || ou) {
@@ -5385,11 +5424,14 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 				  "dir_nodeid %d overlap %d %d",
 				  lkb->lkb_id, mstype, r->res_nodeid,
 				  dlm_dir_nodeid(r), oc, ou);
+			unlock_lkb(lkb);
 		}
 		unlock_rsb(r);
 		put_rsb(r);
+		unlock_lkb(lkb);
 		dlm_put_lkb(lkb);
 	}
+	srcu_read_unlock(&ls->ls_lkb_srcu, idx);
 
 	return error;
 }
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 456c6ec3ef6f..8029aa96e47c 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -74,5 +74,15 @@ static inline void unlock_rsb(struct dlm_rsb *r)
 	mutex_unlock(&r->res_mutex);
 }
 
+static inline void lock_lkb(struct dlm_lkb *lkb)
+{
+	mutex_lock(&lkb->lkb_mutex);
+}
+
+static inline void unlock_lkb(struct dlm_lkb *lkb)
+{
+	mutex_unlock(&lkb->lkb_mutex);
+}
+
 #endif
 
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 2896f96cf2d3..e1199c41301f 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -524,7 +524,8 @@ static int new_lockspace(const char *name, const char *cluster,
 	spin_lock_init(&ls->ls_lkbidr_spin);
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
-	mutex_init(&ls->ls_waiters_mutex);
+	init_srcu_struct(&ls->ls_lkb_srcu);
+	spin_lock_init(&ls->ls_waiters_lock);
 	INIT_LIST_HEAD(&ls->ls_orphans);
 	mutex_init(&ls->ls_orphans_mutex);
 	INIT_LIST_HEAD(&ls->ls_timeout);
@@ -810,6 +811,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	idr_destroy(&ls->ls_recover_idr);
 	kfree(ls->ls_recover_buf);
 
+	/*
+	 * wait all readers of ls_waiters and srcu callers are left before free
+	 */
+	srcu_barrier(&ls->ls_lkb_srcu);
+	cleanup_srcu_struct(&ls->ls_lkb_srcu);
+
 	/*
 	 * Free all lkb's in idr
 	 */
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 8928e99dfd47..ed5a248af641 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -939,6 +939,11 @@ void dlm_clear_toss(struct dlm_ls *ls)
 	unsigned int count = 0;
 	int i;
 
+	/* _dlm_free_lkb() called by ls_lkb_srcu will clear toss as well,
+	 * so call srcu_barrier() to be sure all pending srcu are done
+	 */
+	srcu_barrier(&ls->ls_lkb_srcu);
+
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
-- 
2.27.0



      parent reply	other threads:[~2021-10-04 20:58 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-10-04 20:58 [Cluster-devel] [RFC PATCH dlm/next 0/2] fs: dlm: remove reverse locking order Alexander Aring
2021-10-04 20:58 ` [Cluster-devel] [RFC PATCH dlm/next 1/2] fs: dlm: provide some lockless functionality Alexander Aring
2021-10-04 20:58 ` Alexander Aring [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20211004205805.3938460-3-aahringo@redhat.com \
    --to=aahringo@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).