* [RFC dlm/next 00/11] dlm: approach for new lkb reference counting
@ 2024-11-07 20:46 Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 01/11] dlm: remove set_master() negative return check Alexander Aring
` (10 more replies)
0 siblings, 11 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
Hi,
this is a new approach to change the lkb refcounting model. The current
handling requires to assume that certain return values happen or not,
especially for _request_lock() function. However we do it and sometimes
don't do it (which requires that certain return values cannot happen).
That makes a big headache because we also need to consider recovery
handling and state changes inbetween.
The new approach is very tight to the ast handling and in my opinion
more robust to not fail, because we would notice earlier if a ast
callback will not happen. The lkb lifetime has different states request,
convert and unlock. Each of them has different meanings according the
ast callback result value and the current lkb lifetime state, e.g. a
-EAGAIN on a request will end the lkb lifetime but not on a convert.
Now we require that even master copy call queue_cast() but don't
schedule a callback for the user as they are not operating on DLM API
user requests. We can now even drop a lot of return value for DLM lock
requests that are also called for internal lock request by e.g. recovery
as we don't need to evaluate the return value for a potential put(),
this is now done on queue_cast().
We also can now check on sanity that requests, converts, cancel/unlock
happens on the right lkb lifetime state, e.g. a request cannot happen
after a convert or a convert cannot happen after an unlock.
- Alex
Alexander Aring (11):
dlm: remove set_master() negative return check
dlm: use move_lkb() instead del/add lkb
dlm: use hold_lkb() instead kref_get()
dlm: don't track references on move_lkb()
dlm: drop lkb hold for waiter conversion handling
dlm: track reference for lkb_rsb_lookup
dlm: call queue_cast() on master copy as well
dlm: make send dlm message as non-failure
dlm: introduce new lkb refcount model
dlm: void convert, cancel and unlock requests
dlm: return void for _request_lock function
fs/dlm/dlm_internal.h | 13 ++
fs/dlm/lock.c | 522 ++++++++++++++++++++++--------------------
fs/dlm/lock.h | 4 +-
fs/dlm/user.c | 5 +-
4 files changed, 290 insertions(+), 254 deletions(-)
--
2.43.0
^ permalink raw reply [flat|nested] 12+ messages in thread
* [RFC dlm/next 01/11] dlm: remove set_master() negative return check
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 02/11] dlm: use move_lkb() instead del/add lkb Alexander Aring
` (9 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
Remove the set_master() negative return check as set_master() cannot
return a negative return value.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 2 --
1 file changed, 2 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 966a926c301b..cee32aa6b218 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3174,8 +3174,6 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* set_master: sets lkb nodeid from r */
error = set_master(r, lkb);
- if (error < 0)
- goto out;
if (error) {
error = 0;
goto out;
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 02/11] dlm: use move_lkb() instead del/add lkb
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 01/11] dlm: remove set_master() negative return check Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 03/11] dlm: use hold_lkb() instead kref_get() Alexander Aring
` (8 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
Do a move_lkb() when we do a del and add of lkb that indicates a move to
a different statequeue.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index cee32aa6b218..32fdee78e7f5 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3097,8 +3097,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (can_be_queued(lkb)) {
error = -EINPROGRESS;
- del_lkb(r, lkb);
- add_lkb(r, lkb, DLM_LKSTS_CONVERT);
+ move_lkb(r, lkb, DLM_LKSTS_CONVERT);
goto out;
}
@@ -4512,8 +4511,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
receive_flags_reply(lkb, ms, local);
if (is_demoted(lkb))
munge_demoted(lkb);
- del_lkb(r, lkb);
- add_lkb(r, lkb, DLM_LKSTS_CONVERT);
+ move_lkb(r, lkb, DLM_LKSTS_CONVERT);
break;
case 0:
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 03/11] dlm: use hold_lkb() instead kref_get()
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 01/11] dlm: remove set_master() negative return check Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 02/11] dlm: use move_lkb() instead del/add lkb Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 04/11] dlm: don't track references on move_lkb() Alexander Aring
` (7 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
Using internal dlm helper to call kref_get() for a lkb.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 21 +++++++++++----------
1 file changed, 11 insertions(+), 10 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 32fdee78e7f5..42cef3d2af47 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1523,6 +1523,15 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
}
+/* This is only called to add a reference when the code already holds
+ * a valid reference to the lkb, so there's no need for locking.
+ */
+
+static inline void hold_lkb(struct dlm_lkb *lkb)
+{
+ kref_get(&lkb->lkb_ref);
+}
+
static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
@@ -1536,7 +1545,7 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
*/
read_lock_bh(&ls->ls_lkbxa_lock);
if (kref_read(&lkb->lkb_ref))
- kref_get(&lkb->lkb_ref);
+ hold_lkb(lkb);
else
lkb = NULL;
read_unlock_bh(&ls->ls_lkbxa_lock);
@@ -1593,14 +1602,6 @@ int dlm_put_lkb(struct dlm_lkb *lkb)
return __put_lkb(ls, lkb);
}
-/* This is only called to add a reference when the code already holds
- a valid reference to the lkb, so there's no need for locking. */
-
-static inline void hold_lkb(struct dlm_lkb *lkb)
-{
- kref_get(&lkb->lkb_ref);
-}
-
static void unhold_lkb_assert(struct kref *kref)
{
struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
@@ -1638,7 +1639,7 @@ static void lkb_add_ordered(struct list_head *new, struct list_head *head,
static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
{
- kref_get(&lkb->lkb_ref);
+ hold_lkb(lkb);
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 04/11] dlm: don't track references on move_lkb()
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (2 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 03/11] dlm: use hold_lkb() instead kref_get() Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 05/11] dlm: drop lkb hold for waiter conversion handling Alexander Aring
` (6 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
A move_lkb() is to move an lkb to one statequeue to another. There is no
need to drop the reference and add the reference again after the
move is done. Adding helpers for statequeue that does not track
references so move_lkb() can use them.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 42cef3d2af47..378234b42593 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1637,10 +1637,8 @@ static void lkb_add_ordered(struct list_head *new, struct list_head *head,
/* add/remove lkb to rsb's grant/convert/wait queue */
-static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
+static void add_lkb_noref(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
{
- hold_lkb(lkb);
-
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
lkb->lkb_timestamp = ktime_get();
@@ -1671,17 +1669,28 @@ static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
}
}
-static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
+{
+ hold_lkb(lkb);
+ add_lkb_noref(r, lkb, status);
+}
+
+static void del_lkb_noref(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
lkb->lkb_status = 0;
list_del(&lkb->lkb_statequeue);
+}
+
+static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ del_lkb_noref(r, lkb);
unhold_lkb(lkb);
}
static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
{
- del_lkb(r, lkb);
- add_lkb(r, lkb, sts);
+ del_lkb_noref(r, lkb);
+ add_lkb_noref(r, lkb, sts);
}
static int msg_reply_type(int mstype)
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 05/11] dlm: drop lkb hold for waiter conversion handling
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (3 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 04/11] dlm: don't track references on move_lkb() Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 06/11] dlm: track reference for lkb_rsb_lookup Alexander Aring
` (5 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
Drop the extra reference holding when handling a conversion as a
conversion should never free a lkb. Adding more comments for unlock and
cancel case because internal remove_from_waiters() can end the lkb
lifetime.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 378234b42593..1f4f2d24bef4 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -5005,7 +5005,6 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms_local)
{
if (middle_conversion(lkb)) {
- hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
@@ -5015,7 +5014,6 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
/* Same special case as in receive_rcom_lock_args() */
lkb->lkb_grmode = DLM_LOCK_IV;
rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
- unhold_lkb(lkb);
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
@@ -5120,10 +5118,17 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
break;
case DLM_MSG_CONVERT:
+ /* a convert should never end lkbs lifetime */
recover_convert_waiter(ls, lkb, ms_local);
break;
case DLM_MSG_UNLOCK:
+ /* _receive_unlock_reply() can call remove_from_waiters()
+ * that leads to free the lkb and the reference of ther
+ * iterating lkb for ls_waiters get drops. To prevent this
+ * we need to hold the lkb here to prevent use after free
+ * when the lkb is removed from the waiter.
+ */
hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
@@ -5134,6 +5139,12 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
break;
case DLM_MSG_CANCEL:
+ /* _receive_cancel_reply() can call remove_from_waiters()
+ * that leads to free the lkb and the reference of ther
+ * iterating lkb for ls_waiters get drops. To prevent this
+ * we need to hold the lkb here to prevent use after free
+ * when the lkb is removed from the waiter.
+ */
hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 06/11] dlm: track reference for lkb_rsb_lookup
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (4 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 05/11] dlm: drop lkb hold for waiter conversion handling Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 07/11] dlm: call queue_cast() on master copy as well Alexander Aring
` (4 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
We don't track the reference for lkb_rsb_lookup list. This only probably
works because _request_lock() is sending out a request to a remote node
as this is the lock master as we don't evaluate the return value of
_request_lock() for a potential -EAGAIN. Future patches introduce a new
reference counting model for lkbs that does not require evaluating of
_request_lock() for a potential put. There might be potential issues
because recovery could interact inbetween and changing the lock master.
However we track the reference now if an lkb is part of the
lkb_rsb_lookup list and drop it again when it is deleted.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 1f4f2d24bef4..b570e9f7cc6f 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -2658,6 +2658,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
}
if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
+ hold_lkb(lkb);
list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
return 1;
}
@@ -2700,6 +2701,7 @@ static void process_lookup_list(struct dlm_rsb *r)
list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
list_del_init(&lkb->lkb_rsb_lookup);
_request_lock(r, lkb);
+ dlm_put_lkb(lkb);
}
}
@@ -2734,6 +2736,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
list_del_init(&lkb->lkb_rsb_lookup);
r->res_first_lkid = lkb->lkb_id;
_request_lock(r, lkb);
+ dlm_put_lkb(lkb);
}
break;
@@ -2904,6 +2907,8 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
args->flags & DLM_LKF_CANCEL ?
-DLM_ECANCEL : -DLM_EUNLOCK);
unhold_lkb(lkb); /* undoes create_lkb() */
+ /* for lkb_rsb_lookup */
+ dlm_put_lkb(lkb);
}
/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
goto out;
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 07/11] dlm: call queue_cast() on master copy as well
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (5 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 06/11] dlm: track reference for lkb_rsb_lookup Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 08/11] dlm: make send dlm message as non-failure Alexander Aring
` (3 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
To prepare for a new lkb refcounting model that is tight to the lkb
request state and the ast callback being queued it is required to call
queue_cast() for master copies as well. Currently queue_cast() will drop
any call from a master copy lkb so there should be no change and
queue_cast() will ignore such request as well.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index b570e9f7cc6f..f2060a9d78f3 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -2087,8 +2087,7 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
grant_lock(r, lkb);
if (is_master_copy(lkb))
send_grant(r, lkb);
- else
- queue_cast(r, lkb, 0);
+ queue_cast(r, lkb, 0);
}
/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 08/11] dlm: make send dlm message as non-failure
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (6 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 07/11] dlm: call queue_cast() on master copy as well Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 09/11] dlm: introduce new lkb refcount model Alexander Aring
` (2 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
We cannot handle if any DLM message being send out fails so we change
every send message as it cannot fail.
There might be in future other ways to handle send failures internally
but from a caller prospective it should never fail.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 235 ++++++++++++++++++--------------------------------
fs/dlm/lock.h | 4 +-
fs/dlm/user.c | 5 +-
3 files changed, 86 insertions(+), 158 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f2060a9d78f3..9d2d3567bf9d 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -75,14 +75,14 @@
#include "user.h"
#include "config.h"
-static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
-static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_remove(struct dlm_rsb *r);
+static void send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
+static void send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1693,23 +1693,6 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
add_lkb_noref(r, lkb, sts);
}
-static int msg_reply_type(int mstype)
-{
- switch (mstype) {
- case DLM_MSG_REQUEST:
- return DLM_MSG_REQUEST_REPLY;
- case DLM_MSG_CONVERT:
- return DLM_MSG_CONVERT_REPLY;
- case DLM_MSG_UNLOCK:
- return DLM_MSG_UNLOCK_REPLY;
- case DLM_MSG_CANCEL:
- return DLM_MSG_CANCEL_REPLY;
- case DLM_MSG_LOOKUP:
- return DLM_MSG_LOOKUP_REPLY;
- }
- return -1;
-}
-
/* add/remove lkb from global waiters list of lkb's waiting for
a reply from a remote node */
@@ -3194,7 +3177,7 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (is_remote(r)) {
/* receive_request() calls do_request() on remote node */
- error = send_request(r, lkb);
+ send_request(r, lkb);
} else {
error = do_request(r, lkb);
/* for remote locks the request_reply is sent
@@ -3209,11 +3192,11 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
+ int error = 0;
if (is_remote(r)) {
/* receive_convert() calls do_convert() on remote node */
- error = send_convert(r, lkb);
+ send_convert(r, lkb);
} else {
error = do_convert(r, lkb);
/* for remote locks the convert_reply is sent
@@ -3228,11 +3211,11 @@ static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
+ int error = 0;
if (is_remote(r)) {
/* receive_unlock() calls do_unlock() on remote node */
- error = send_unlock(r, lkb);
+ send_unlock(r, lkb);
} else {
error = do_unlock(r, lkb);
/* for remote locks the unlock_reply is sent
@@ -3247,11 +3230,11 @@ static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
+ int error = 0;
if (is_remote(r)) {
/* receive_cancel() calls do_cancel() on remote node */
- error = send_cancel(r, lkb);
+ send_cancel(r, lkb);
} else {
error = do_cancel(r, lkb);
/* for remote locks the cancel_reply is sent
@@ -3489,10 +3472,10 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
* receive_lookup_reply send_lookup_reply
*/
-static int _create_message(struct dlm_ls *ls, int mb_len,
- int to_nodeid, int mstype,
- struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret)
+static void _create_message(struct dlm_ls *ls, int mb_len,
+ int to_nodeid, int mstype,
+ struct dlm_message **ms_ret,
+ struct dlm_mhandle **mh_ret)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
@@ -3503,8 +3486,8 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
write our data into */
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
- if (!mh)
- return -ENOBUFS;
+ if (WARN_ON(!mh))
+ return;
ms = (struct dlm_message *) mb;
@@ -3518,13 +3501,12 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
*mh_ret = mh;
*ms_ret = ms;
- return 0;
}
-static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
- int to_nodeid, int mstype,
- struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret)
+static void create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int to_nodeid, int mstype,
+ struct dlm_message **ms_ret,
+ struct dlm_mhandle **mh_ret)
{
int mb_len = sizeof(struct dlm_message);
@@ -3548,14 +3530,10 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
ms_ret, mh_ret);
}
-/* further lowcomms enhancements or alternate implementations may make
- the return value from this function useful at some point */
-
-static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
- const void *name, int namelen)
+static void send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
+ const void *name, int namelen)
{
dlm_midcomms_commit_mhandle(mh, name, namelen);
- return 0;
}
static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3602,216 +3580,173 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
}
-static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
+static void send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = r->res_nodeid;
add_to_waiters(lkb, mstype, to_nodeid);
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
- if (error)
- goto fail;
+ create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
send_args(r, lkb, ms);
- error = send_message(mh, ms, r->res_name, r->res_length);
- if (error)
- goto fail;
- return 0;
-
- fail:
- remove_from_waiters(lkb, msg_reply_type(mstype));
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- return send_common(r, lkb, DLM_MSG_REQUEST);
+ send_common(r, lkb, DLM_MSG_REQUEST);
}
-static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
-
- error = send_common(r, lkb, DLM_MSG_CONVERT);
+ send_common(r, lkb, DLM_MSG_CONVERT);
/* down conversions go without a reply from the master */
- if (!error && down_conversion(lkb)) {
+ if (down_conversion(lkb)) {
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_local_ms.m_result = 0;
__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
}
-
- return error;
}
/* FIXME: if this lkb is the only lock we hold on the rsb, then set
MASTER_UNCERTAIN to force the next request on the rsb to confirm
that the master is still correct. */
-static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- return send_common(r, lkb, DLM_MSG_UNLOCK);
+ send_common(r, lkb, DLM_MSG_UNLOCK);
}
-static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- return send_common(r, lkb, DLM_MSG_CANCEL);
+ send_common(r, lkb, DLM_MSG_CANCEL);
}
-static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
- if (error)
- goto out;
-
+ create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
send_args(r, lkb, ms);
ms->m_result = 0;
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
+static void send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
send_args(r, lkb, ms);
ms->m_bastmode = cpu_to_le32(mode);
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = dlm_dir_nodeid(r);
add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
- error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
- if (error)
- goto fail;
+ create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
send_args(r, lkb, ms);
- error = send_message(mh, ms, r->res_name, r->res_length);
- if (error)
- goto fail;
- return 0;
-
- fail:
- remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_remove(struct dlm_rsb *r)
+static void send_remove(struct dlm_rsb *r)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = dlm_dir_nodeid(r);
- error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
memcpy(ms->m_extra, r->res_name, r->res_length);
ms->m_hash = cpu_to_le32(r->res_hash);
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
- int mstype, int rv)
+static void send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int mstype, int rv)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
send_args(r, lkb, ms);
ms->m_result = cpu_to_le32(to_dlm_errno(rv));
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
}
-static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
}
-static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
}
-static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
}
-static int send_lookup_reply(struct dlm_ls *ls,
- const struct dlm_message *ms_in, int ret_nodeid,
- int rv)
+static void send_lookup_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms_in, int ret_nodeid,
+ int rv)
{
struct dlm_rsb *r = &ls->ls_local_rsb;
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
+ int nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
- error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
ms->m_lkid = ms_in->m_lkid;
ms->m_result = cpu_to_le32(to_dlm_errno(rv));
ms->m_nodeid = cpu_to_le32(ret_nodeid);
- error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
- out:
- return error;
+ send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
}
/* which args we save from a received message depends heavily on the type
@@ -6258,29 +6193,24 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
spin_unlock_bh(&ls->ls_orphans_lock);
}
-static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
+static void send_purge(struct dlm_ls *ls, int nodeid, int pid)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int error;
- error = _create_message(ls, sizeof(struct dlm_message), nodeid,
- DLM_MSG_PURGE, &ms, &mh);
- if (error)
- return error;
+ _create_message(ls, sizeof(struct dlm_message), nodeid,
+ DLM_MSG_PURGE, &ms, &mh);
ms->m_nodeid = cpu_to_le32(nodeid);
ms->m_pid = cpu_to_le32(pid);
- return send_message(mh, ms, NULL, 0);
+ send_message(mh, ms, NULL, 0);
}
-int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
- int nodeid, int pid)
+void dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
+ int nodeid, int pid)
{
- int error = 0;
-
if (nodeid && (nodeid != dlm_our_nodeid())) {
- error = send_purge(ls, nodeid, pid);
+ send_purge(ls, nodeid, pid);
} else {
dlm_lock_recovery(ls);
if (pid == current->pid)
@@ -6289,7 +6219,6 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
do_purge(ls, nodeid, pid);
dlm_unlock_recovery(ls);
}
- return error;
}
/* debug functionality */
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index b23d7b854ed4..45ffe149ae3b 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -55,8 +55,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
uint32_t flags, uint32_t lkid, char *lvb_in);
int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
uint32_t flags, uint32_t lkid);
-int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
- int nodeid, int pid);
+void dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
+ int nodeid, int pid);
int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid);
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 5cb3896be826..4b1f5679907f 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -390,16 +390,15 @@ static int device_user_purge(struct dlm_user_proc *proc,
struct dlm_purge_params *params)
{
struct dlm_ls *ls;
- int error;
ls = dlm_find_lockspace_local(proc->lockspace);
if (!ls)
return -ENOENT;
- error = dlm_user_purge(ls, proc, params->nodeid, params->pid);
+ dlm_user_purge(ls, proc, params->nodeid, params->pid);
dlm_put_lockspace(ls);
- return error;
+ return 0;
}
static int device_create_lockspace(struct dlm_lspace_params *params)
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 09/11] dlm: introduce new lkb refcount model
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (7 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 08/11] dlm: make send dlm message as non-failure Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 10/11] dlm: void convert, cancel and unlock requests Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 11/11] dlm: return void for _request_lock function Alexander Aring
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
The current lkb refcount requires to evaluate return values from
functions e.g. _request_lock(). We do that sometimes and we don't do
that sometimes. If we don't do that the lkb referencing to a lock
resource need to associated with a remote lock master. If this is the
case or not requires a lot of thinking as recovery can also change
states.
The new lkb refcount model drops all the necessarily evaluation of the
return value as it introduce a lkb request state model. A lkb has a
specific lifetime that is tight to an ast callback, depending on the lkb
request state and the ast return value it signals when an lkb lifetime
ends or not.
This new model is more robust to being broken as we would notice more a
missing ast callback than a missing return value check of e.g.
_request_lock(). Additional we can check for invalid lkb request states
over the lifetime or invalid ast result codes depending on this state.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/dlm_internal.h | 13 ++++
fs/dlm/lock.c | 146 ++++++++++++++++++++++++++++++++++--------
2 files changed, 132 insertions(+), 27 deletions(-)
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index d534a4bc162b..cd07077550c5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -250,6 +250,17 @@ struct dlm_callback {
struct list_head list;
};
+/* This enum values represents the LKB lifetime states.
+ * A lkb can be over the time in an initial request state,
+ * then only conversions as requests are allowed until the
+ * lifetime ends by calling unlock.
+ */
+enum dlm_lkb_rq_state {
+ DLM_LKB_RQ_STATE_REQUEST,
+ DLM_LKB_RQ_STATE_CONVERT,
+ DLM_LKB_RQ_STATE_UNLOCK,
+};
+
struct dlm_lkb {
struct dlm_rsb *lkb_resource; /* the rsb */
struct kref lkb_ref;
@@ -263,6 +274,8 @@ struct dlm_lkb {
unsigned long lkb_iflags; /* internal flags */
uint32_t lkb_lvbseq; /* lvb sequence number */
+ enum dlm_lkb_rq_state lkb_rq_state;
+
int8_t lkb_status; /* granted, waiting, convert */
int8_t lkb_rqmode; /* requested lock mode */
int8_t lkb_grmode; /* granted lock mode */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 9d2d3567bf9d..1de8598d7451 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -287,17 +287,99 @@ static inline int is_overlap(struct dlm_lkb *lkb)
test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
}
-static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void dlm_lkb_queue_cb_may_put(struct dlm_lkb *lkb, int status)
{
- if (is_master_copy(lkb))
- return;
+ switch (status) {
+ case 0:
+ switch (lkb->lkb_rq_state) {
+ case DLM_LKB_RQ_STATE_REQUEST:
+ lkb->lkb_rq_state = DLM_LKB_RQ_STATE_CONVERT;
+ break;
+ case DLM_LKB_RQ_STATE_CONVERT:
+ break;
+ default:
+ WARN_ON(1);
+ break;
+ }
- DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
+ break;
+ case -DLM_ECANCEL:
+ switch (lkb->lkb_rq_state) {
+ case DLM_LKB_RQ_STATE_REQUEST:
+ /* cancel on a request will end the lkb lifetime
+ * as it is the initial lock request.
+ */
+ dlm_put_lkb(lkb);
+ break;
+ case DLM_LKB_RQ_STATE_CONVERT:
+ break;
+ default:
+ WARN_ON(1);
+ break;
+ }
+
+ break;
+ case -EAGAIN:
+ switch (lkb->lkb_rq_state) {
+ case DLM_LKB_RQ_STATE_REQUEST:
+ /* -EAGAIN ends the lkb lifetime as the initial lock
+ * request failed.
+ */
+ dlm_put_lkb(lkb);
+ break;
+ case DLM_LKB_RQ_STATE_CONVERT:
+ break;
+ default:
+ WARN_ON(1);
+ break;
+ }
+
+ break;
+ case -EDEADLK:
+ switch (lkb->lkb_rq_state) {
+ case DLM_LKB_RQ_STATE_CONVERT:
+ break;
+ default:
+ WARN_ON(1);
+ break;
+ }
+
+ break;
+ case -DLM_EUNLOCK:
+ switch (lkb->lkb_rq_state) {
+ case DLM_LKB_RQ_STATE_REQUEST:
+ fallthrough;
+ case DLM_LKB_RQ_STATE_CONVERT:
+ lkb->lkb_rq_state = DLM_LKB_RQ_STATE_UNLOCK;
+ break;
+ default:
+ WARN_ON(1);
+ break;
+ }
+
+ /* for any case an unlock ends the lifetime of a lkb */
+ dlm_put_lkb(lkb);
+ break;
+ default:
+ /* something we should check */
+ WARN_ON(1);
+ break;
+ }
+}
+static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+{
if (rv == -DLM_ECANCEL &&
test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
rv = -EDEADLK;
+ dlm_lkb_queue_cb_may_put(lkb, rv);
+
+ if (is_master_copy(lkb))
+ return;
+
+ DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
+
dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
}
@@ -1495,6 +1577,8 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
if (!lkb)
return -ENOMEM;
+ /* every lkb starts with a request as init request state */
+ lkb->lkb_rq_state = DLM_LKB_RQ_STATE_REQUEST;
lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
lkb->lkb_last_cb_mode = DLM_LOCK_IV;
@@ -1982,9 +2066,6 @@ static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
del_lkb(r, lkb);
lkb->lkb_grmode = DLM_LOCK_IV;
- /* this unhold undoes the original ref from create_lkb()
- so this leads to the lkb being freed */
- unhold_lkb(lkb);
}
static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -2018,9 +2099,6 @@ static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
case DLM_LKSTS_WAITING:
del_lkb(r, lkb);
lkb->lkb_grmode = DLM_LOCK_IV;
- /* this unhold undoes the original ref from create_lkb()
- so this leads to the lkb being freed */
- unhold_lkb(lkb);
rv = -1;
break;
default:
@@ -2070,6 +2148,7 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
grant_lock(r, lkb);
if (is_master_copy(lkb))
send_grant(r, lkb);
+
queue_cast(r, lkb, 0);
}
@@ -2888,7 +2967,6 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
queue_cast(lkb->lkb_resource, lkb,
args->flags & DLM_LKF_CANCEL ?
-DLM_ECANCEL : -DLM_EUNLOCK);
- unhold_lkb(lkb); /* undoes create_lkb() */
/* for lkb_rsb_lookup */
dlm_put_lkb(lkb);
}
@@ -3194,6 +3272,8 @@ static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
int error = 0;
+ WARN_ON(lkb->lkb_rq_state != DLM_LKB_RQ_STATE_CONVERT);
+
if (is_remote(r)) {
/* receive_convert() calls do_convert() on remote node */
send_convert(r, lkb);
@@ -3213,6 +3293,9 @@ static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
int error = 0;
+ WARN_ON(lkb->lkb_rq_state != DLM_LKB_RQ_STATE_REQUEST &&
+ lkb->lkb_rq_state != DLM_LKB_RQ_STATE_CONVERT);
+
if (is_remote(r)) {
/* receive_unlock() calls do_unlock() on remote node */
send_unlock(r, lkb);
@@ -3232,6 +3315,9 @@ static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
int error = 0;
+ WARN_ON(lkb->lkb_rq_state != DLM_LKB_RQ_STATE_REQUEST &&
+ lkb->lkb_rq_state != DLM_LKB_RQ_STATE_CONVERT);
+
if (is_remote(r)) {
/* receive_cancel() calls do_cancel() on remote node */
send_cancel(r, lkb);
@@ -3265,6 +3351,13 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
if (error)
return error;
+ /* this reference is for the lkb lifetime that can
+ * end on a ast callback, see dlm_lkb_queue_cb_may_put().
+ *
+ * at this point the request must return an ast callback.
+ */
+ hold_lkb(lkb);
+
lock_rsb(r);
attach_lkb(r, lkb);
@@ -3394,8 +3487,7 @@ int dlm_lock(dlm_lockspace_t *lockspace,
out_put:
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
- if (convert || error)
- __put_lkb(ls, lkb);
+ __put_lkb(ls, lkb);
if (error == -EAGAIN || error == -EDEADLK)
error = 0;
out:
@@ -3968,13 +4060,15 @@ static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
send_request_reply(r, lkb, error);
do_request_effects(r, lkb, error);
+ /* no dlm_put_lkb() because the lkb is created and the request
+ * is sent out. The answer cannot be handled yet because we
+ * still hold the rsb lock. The initial refcount of 1 from
+ * create_lkb() is the lkb lifetime refcount.
+ */
+
unlock_rsb(r);
put_rsb(r);
- if (error == -EINPROGRESS)
- error = 0;
- if (error)
- dlm_put_lkb(lkb);
return 0;
fail:
@@ -4363,7 +4457,6 @@ static int receive_request_reply(struct dlm_ls *ls,
/* request would block (be queued) on remote master */
queue_cast(r, lkb, -EAGAIN);
confirm_master(r, -EAGAIN);
- unhold_lkb(lkb); /* undoes create_lkb() */
break;
case -EINPROGRESS:
@@ -4402,7 +4495,6 @@ static int receive_request_reply(struct dlm_ls *ls,
/* we'll ignore error in cancel/unlock reply */
queue_cast_overlap(r, lkb);
confirm_master(r, result);
- unhold_lkb(lkb); /* undoes create_lkb() */
} else {
_request_lock(r, lkb);
@@ -4688,7 +4780,6 @@ static void receive_lookup_reply(struct dlm_ls *ls,
log_debug(ls, "receive_lookup_reply %x unlock %x",
lkb->lkb_id, dlm_iflags_val(lkb));
queue_cast_overlap(r, lkb);
- unhold_lkb(lkb); /* undoes create_lkb() */
goto out_list;
}
@@ -5215,6 +5306,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
/* Forcibly remove from waiters list */
spin_lock_bh(&ls->ls_waiters_lock);
+ /* above unhold_lkb() contains waiters list ref */
list_del_init(&lkb->lkb_wait_reply);
spin_unlock_bh(&ls->ls_waiters_lock);
@@ -5231,7 +5323,6 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
case DLM_MSG_REQUEST:
queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
-DLM_ECANCEL);
- unhold_lkb(lkb); /* undoes create_lkb() */
break;
case DLM_MSG_CONVERT:
if (oc) {
@@ -5268,6 +5359,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
}
unlock_rsb(r);
put_rsb(r);
+ /* for find_resend_waiter() */
dlm_put_lkb(lkb);
}
@@ -5686,7 +5778,6 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
{
struct dlm_lkb *lkb;
struct dlm_args args;
- bool do_put = true;
int error;
dlm_lock_recovery(ls);
@@ -5740,11 +5831,9 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
hold_lkb(lkb);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
spin_unlock_bh(&ua->proc->locks_spin);
- do_put = false;
out_put:
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
- if (do_put)
- __put_lkb(ls, lkb);
+ __put_lkb(ls, lkb);
out:
dlm_unlock_recovery(ls);
return error;
@@ -6062,8 +6151,11 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
}
/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
- (which does lock_rsb) due to deadlock with receiving a message that does
- lock_rsb followed by dlm_user_add_cb() */
+ * (which does lock_rsb) due to deadlock with receiving a message that does
+ * lock_rsb followed by dlm_user_add_cb()
+ *
+ * caller needs to call dlm_put_lkb() when lkb returns
+ */
static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
struct dlm_user_proc *proc)
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 10/11] dlm: void convert, cancel and unlock requests
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (8 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 09/11] dlm: introduce new lkb refcount model Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 11/11] dlm: return void for _request_lock function Alexander Aring
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
Ad the new lkb refcount model does not require to evaluate the return
value of request functions we can change those function to return no
value. Before the return value was just always zeroed out if any of the
non-zero return value was returned. This is not necessary anymore.
The _request_lock() function need an additional handling as the user
request functionality still needs to evaluate the return value.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 37 ++++++++++++-------------------------
1 file changed, 12 insertions(+), 25 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 1de8598d7451..28c312410527 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -84,7 +84,7 @@ static void send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
static void send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local);
static int receive_extralen(const struct dlm_message *ms);
@@ -3268,9 +3268,9 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* change some property of an existing lkb, e.g. mode */
-static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error = 0;
+ int error;
WARN_ON(lkb->lkb_rq_state != DLM_LKB_RQ_STATE_CONVERT);
@@ -3283,15 +3283,13 @@ static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
between do_convert and do_convert_effects */
do_convert_effects(r, lkb, error);
}
-
- return error;
}
/* remove an existing lkb from the granted queue */
-static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error = 0;
+ int error;
WARN_ON(lkb->lkb_rq_state != DLM_LKB_RQ_STATE_REQUEST &&
lkb->lkb_rq_state != DLM_LKB_RQ_STATE_CONVERT);
@@ -3305,15 +3303,13 @@ static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
between do_unlock and do_unlock_effects */
do_unlock_effects(r, lkb, error);
}
-
- return error;
}
/* remove an existing lkb from the convert or wait queue */
-static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error = 0;
+ int error;
WARN_ON(lkb->lkb_rq_state != DLM_LKB_RQ_STATE_REQUEST &&
lkb->lkb_rq_state != DLM_LKB_RQ_STATE_CONVERT);
@@ -3327,8 +3323,6 @@ static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
between do_cancel and do_cancel_effects */
do_cancel_effects(r, lkb, error);
}
-
- return error;
}
/*
@@ -3385,7 +3379,7 @@ static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
if (error)
goto out;
- error = _convert_lock(r, lkb);
+ _convert_lock(r, lkb);
out:
unlock_rsb(r);
put_rsb(r);
@@ -3407,7 +3401,7 @@ static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
if (error)
goto out;
- error = _unlock_lock(r, lkb);
+ _unlock_lock(r, lkb);
out:
unlock_rsb(r);
put_rsb(r);
@@ -3429,7 +3423,7 @@ static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
if (error)
goto out;
- error = _cancel_lock(r, lkb);
+ _cancel_lock(r, lkb);
out:
unlock_rsb(r);
put_rsb(r);
@@ -3482,13 +3476,11 @@ int dlm_lock(dlm_lockspace_t *lockspace,
else
error = request_lock(ls, lkb, name, namelen, &args);
- if (error == -EINPROGRESS)
- error = 0;
out_put:
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
__put_lkb(ls, lkb);
- if (error == -EAGAIN || error == -EDEADLK)
+ if (error == -EAGAIN || error == -EINPROGRESS)
error = 0;
out:
dlm_unlock_recovery(ls);
@@ -3528,8 +3520,6 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
else
error = unlock_lock(ls, lkb, &args);
- if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
- error = 0;
if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
error = 0;
out_put:
@@ -4497,7 +4487,6 @@ static int receive_request_reply(struct dlm_ls *ls,
confirm_master(r, result);
} else {
_request_lock(r, lkb);
-
if (r->res_master_nodeid == dlm_our_nodeid())
confirm_master(r, 0);
}
@@ -5884,8 +5873,6 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
error = convert_lock(ls, lkb, &args);
- if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
- error = 0;
out_put:
trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
dlm_put_lkb(lkb);
@@ -6092,7 +6079,7 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
goto out_r;
set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
- error = _cancel_lock(r, lkb);
+ _cancel_lock(r, lkb);
out_r:
unlock_rsb(r);
put_rsb(r);
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [RFC dlm/next 11/11] dlm: return void for _request_lock function
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
` (9 preceding siblings ...)
2024-11-07 20:46 ` [RFC dlm/next 10/11] dlm: void convert, cancel and unlock requests Alexander Aring
@ 2024-11-07 20:46 ` Alexander Aring
10 siblings, 0 replies; 12+ messages in thread
From: Alexander Aring @ 2024-11-07 20:46 UTC (permalink / raw)
To: teigland; +Cc: gfs2, aahringo
Change the _request_lock() function that it cannot fail as the recent
changes to the lkb refcounting model does not require to evaluate the
return value to may call a put.
For the dlm_user_request() function we still need to know if the result
-EAGAIN was happening as the lkb lifetime ends to not add the lkb to the
ownqueue. This is the only user, we use a call-by-reference result
parameter to allow the user to know if the lkb lifetime ended
immediately on the lock request as dlm_user_request() requires.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 49 ++++++++++++++++++++++---------------------------
1 file changed, 22 insertions(+), 27 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 28c312410527..10058c89c811 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -83,7 +83,7 @@ static void send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
static void send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void send_remove(struct dlm_rsb *r);
-static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, int *result);
static void _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local);
@@ -2761,7 +2761,7 @@ static void process_lookup_list(struct dlm_rsb *r)
list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
list_del_init(&lkb->lkb_rsb_lookup);
- _request_lock(r, lkb);
+ _request_lock(r, lkb, NULL);
dlm_put_lkb(lkb);
}
}
@@ -2796,7 +2796,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
lkb_rsb_lookup);
list_del_init(&lkb->lkb_rsb_lookup);
r->res_first_lkid = lkb->lkb_id;
- _request_lock(r, lkb);
+ _request_lock(r, lkb, NULL);
dlm_put_lkb(lkb);
}
break;
@@ -3241,7 +3241,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
/* add a new lkb to a possibly new rsb, called by requesting process */
-static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, int *result)
{
int error;
@@ -3263,7 +3263,8 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
do_request_effects(r, lkb, error);
}
out:
- return error;
+ if (result)
+ *result = error;
}
/* change some property of an existing lkb, e.g. mode */
@@ -3332,7 +3333,7 @@ static void _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
const void *name, int len,
- struct dlm_args *args)
+ struct dlm_args *args, int *result)
{
struct dlm_rsb *r;
int error;
@@ -3357,11 +3358,11 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
attach_lkb(r, lkb);
lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
- error = _request_lock(r, lkb);
+ _request_lock(r, lkb, result);
unlock_rsb(r);
put_rsb(r);
- return error;
+ return 0;
}
static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
@@ -3474,14 +3475,12 @@ int dlm_lock(dlm_lockspace_t *lockspace,
if (convert)
error = convert_lock(ls, lkb, &args);
else
- error = request_lock(ls, lkb, name, namelen, &args);
+ error = request_lock(ls, lkb, name, namelen, &args, NULL);
out_put:
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
__put_lkb(ls, lkb);
- if (error == -EAGAIN || error == -EINPROGRESS)
- error = 0;
out:
dlm_unlock_recovery(ls);
dlm_put_lockspace(ls);
@@ -4486,7 +4485,7 @@ static int receive_request_reply(struct dlm_ls *ls,
queue_cast_overlap(r, lkb);
confirm_master(r, result);
} else {
- _request_lock(r, lkb);
+ _request_lock(r, lkb, NULL);
if (r->res_master_nodeid == dlm_our_nodeid())
confirm_master(r, 0);
}
@@ -4772,7 +4771,7 @@ static void receive_lookup_reply(struct dlm_ls *ls,
goto out_list;
}
- _request_lock(r, lkb);
+ _request_lock(r, lkb, NULL);
out_list:
if (do_lookup_list)
@@ -5328,7 +5327,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
switch (mstype) {
case DLM_MSG_LOOKUP:
case DLM_MSG_REQUEST:
- _request_lock(r, lkb);
+ _request_lock(r, lkb, NULL);
if (r->res_nodeid != -1 && is_master(r))
confirm_master(r, 0);
break;
@@ -5767,7 +5766,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
{
struct dlm_lkb *lkb;
struct dlm_args args;
- int error;
+ int error, result;
dlm_lock_recovery(ls);
@@ -5800,20 +5799,16 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
lock and that lkb_astparam is the dlm_user_args structure. */
set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
- error = request_lock(ls, lkb, name, namelen, &args);
+ error = request_lock(ls, lkb, name, namelen, &args, &result);
+ if (error)
+ goto out_put;
- switch (error) {
- case 0:
- break;
- case -EINPROGRESS:
- error = 0;
- break;
- case -EAGAIN:
- error = 0;
- fallthrough;
- default:
+ /* the lkb lifetime ended because request_lock() handled
+ * internally an -EAGAIN, so we don't add it to the
+ * ownqueue.
+ */
+ if (result == -EAGAIN)
goto out_put;
- }
/* add this new lkb to the per-process list of locks */
spin_lock_bh(&ua->proc->locks_spin);
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
end of thread, other threads:[~2024-11-07 20:46 UTC | newest]
Thread overview: 12+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 01/11] dlm: remove set_master() negative return check Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 02/11] dlm: use move_lkb() instead del/add lkb Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 03/11] dlm: use hold_lkb() instead kref_get() Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 04/11] dlm: don't track references on move_lkb() Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 05/11] dlm: drop lkb hold for waiter conversion handling Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 06/11] dlm: track reference for lkb_rsb_lookup Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 07/11] dlm: call queue_cast() on master copy as well Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 08/11] dlm: make send dlm message as non-failure Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 09/11] dlm: introduce new lkb refcount model Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 10/11] dlm: void convert, cancel and unlock requests Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 11/11] dlm: return void for _request_lock function Alexander Aring
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox