From: Alexander Aring <aahringo@redhat.com>
To: teigland@redhat.com
Cc: gfs2@lists.linux.dev, aahringo@redhat.com
Subject: [RFC dlm/next 08/11] dlm: make send dlm message as non-failure
Date: Thu, 7 Nov 2024 15:46:14 -0500 [thread overview]
Message-ID: <20241107204617.147842-9-aahringo@redhat.com> (raw)
In-Reply-To: <20241107204617.147842-1-aahringo@redhat.com>
We cannot handle if any DLM message being send out fails so we change
every send message as it cannot fail.
There might be in future other ways to handle send failures internally
but from a caller prospective it should never fail.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lock.c | 235 ++++++++++++++++++--------------------------------
fs/dlm/lock.h | 4 +-
fs/dlm/user.c | 5 +-
3 files changed, 86 insertions(+), 158 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f2060a9d78f3..9d2d3567bf9d 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -75,14 +75,14 @@
#include "user.h"
#include "config.h"
-static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
-static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
-static int send_remove(struct dlm_rsb *r);
+static void send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
+static void send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1693,23 +1693,6 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
add_lkb_noref(r, lkb, sts);
}
-static int msg_reply_type(int mstype)
-{
- switch (mstype) {
- case DLM_MSG_REQUEST:
- return DLM_MSG_REQUEST_REPLY;
- case DLM_MSG_CONVERT:
- return DLM_MSG_CONVERT_REPLY;
- case DLM_MSG_UNLOCK:
- return DLM_MSG_UNLOCK_REPLY;
- case DLM_MSG_CANCEL:
- return DLM_MSG_CANCEL_REPLY;
- case DLM_MSG_LOOKUP:
- return DLM_MSG_LOOKUP_REPLY;
- }
- return -1;
-}
-
/* add/remove lkb from global waiters list of lkb's waiting for
a reply from a remote node */
@@ -3194,7 +3177,7 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (is_remote(r)) {
/* receive_request() calls do_request() on remote node */
- error = send_request(r, lkb);
+ send_request(r, lkb);
} else {
error = do_request(r, lkb);
/* for remote locks the request_reply is sent
@@ -3209,11 +3192,11 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
+ int error = 0;
if (is_remote(r)) {
/* receive_convert() calls do_convert() on remote node */
- error = send_convert(r, lkb);
+ send_convert(r, lkb);
} else {
error = do_convert(r, lkb);
/* for remote locks the convert_reply is sent
@@ -3228,11 +3211,11 @@ static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
+ int error = 0;
if (is_remote(r)) {
/* receive_unlock() calls do_unlock() on remote node */
- error = send_unlock(r, lkb);
+ send_unlock(r, lkb);
} else {
error = do_unlock(r, lkb);
/* for remote locks the unlock_reply is sent
@@ -3247,11 +3230,11 @@ static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
+ int error = 0;
if (is_remote(r)) {
/* receive_cancel() calls do_cancel() on remote node */
- error = send_cancel(r, lkb);
+ send_cancel(r, lkb);
} else {
error = do_cancel(r, lkb);
/* for remote locks the cancel_reply is sent
@@ -3489,10 +3472,10 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
* receive_lookup_reply send_lookup_reply
*/
-static int _create_message(struct dlm_ls *ls, int mb_len,
- int to_nodeid, int mstype,
- struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret)
+static void _create_message(struct dlm_ls *ls, int mb_len,
+ int to_nodeid, int mstype,
+ struct dlm_message **ms_ret,
+ struct dlm_mhandle **mh_ret)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
@@ -3503,8 +3486,8 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
write our data into */
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
- if (!mh)
- return -ENOBUFS;
+ if (WARN_ON(!mh))
+ return;
ms = (struct dlm_message *) mb;
@@ -3518,13 +3501,12 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
*mh_ret = mh;
*ms_ret = ms;
- return 0;
}
-static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
- int to_nodeid, int mstype,
- struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret)
+static void create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int to_nodeid, int mstype,
+ struct dlm_message **ms_ret,
+ struct dlm_mhandle **mh_ret)
{
int mb_len = sizeof(struct dlm_message);
@@ -3548,14 +3530,10 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
ms_ret, mh_ret);
}
-/* further lowcomms enhancements or alternate implementations may make
- the return value from this function useful at some point */
-
-static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
- const void *name, int namelen)
+static void send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
+ const void *name, int namelen)
{
dlm_midcomms_commit_mhandle(mh, name, namelen);
- return 0;
}
static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3602,216 +3580,173 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
}
-static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
+static void send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = r->res_nodeid;
add_to_waiters(lkb, mstype, to_nodeid);
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
- if (error)
- goto fail;
+ create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
send_args(r, lkb, ms);
- error = send_message(mh, ms, r->res_name, r->res_length);
- if (error)
- goto fail;
- return 0;
-
- fail:
- remove_from_waiters(lkb, msg_reply_type(mstype));
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- return send_common(r, lkb, DLM_MSG_REQUEST);
+ send_common(r, lkb, DLM_MSG_REQUEST);
}
-static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- int error;
-
- error = send_common(r, lkb, DLM_MSG_CONVERT);
+ send_common(r, lkb, DLM_MSG_CONVERT);
/* down conversions go without a reply from the master */
- if (!error && down_conversion(lkb)) {
+ if (down_conversion(lkb)) {
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_local_ms.m_result = 0;
__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
}
-
- return error;
}
/* FIXME: if this lkb is the only lock we hold on the rsb, then set
MASTER_UNCERTAIN to force the next request on the rsb to confirm
that the master is still correct. */
-static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- return send_common(r, lkb, DLM_MSG_UNLOCK);
+ send_common(r, lkb, DLM_MSG_UNLOCK);
}
-static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
- return send_common(r, lkb, DLM_MSG_CANCEL);
+ send_common(r, lkb, DLM_MSG_CANCEL);
}
-static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
- if (error)
- goto out;
-
+ create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
send_args(r, lkb, ms);
ms->m_result = 0;
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
+static void send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
send_args(r, lkb, ms);
ms->m_bastmode = cpu_to_le32(mode);
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static void send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = dlm_dir_nodeid(r);
add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
- error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
- if (error)
- goto fail;
+ create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
send_args(r, lkb, ms);
- error = send_message(mh, ms, r->res_name, r->res_length);
- if (error)
- goto fail;
- return 0;
-
- fail:
- remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_remove(struct dlm_rsb *r)
+static void send_remove(struct dlm_rsb *r)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = dlm_dir_nodeid(r);
- error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
memcpy(ms->m_extra, r->res_name, r->res_length);
ms->m_hash = cpu_to_le32(r->res_hash);
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
- int mstype, int rv)
+static void send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int mstype, int rv)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int to_nodeid, error;
+ int to_nodeid;
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
send_args(r, lkb, ms);
ms->m_result = cpu_to_le32(to_dlm_errno(rv));
- error = send_message(mh, ms, r->res_name, r->res_length);
- out:
- return error;
+ send_message(mh, ms, r->res_name, r->res_length);
}
-static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
}
-static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
}
-static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
}
-static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+static void send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
- return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
+ send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
}
-static int send_lookup_reply(struct dlm_ls *ls,
- const struct dlm_message *ms_in, int ret_nodeid,
- int rv)
+static void send_lookup_reply(struct dlm_ls *ls,
+ const struct dlm_message *ms_in, int ret_nodeid,
+ int rv)
{
struct dlm_rsb *r = &ls->ls_local_rsb;
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
+ int nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
- error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
- if (error)
- goto out;
+ create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
ms->m_lkid = ms_in->m_lkid;
ms->m_result = cpu_to_le32(to_dlm_errno(rv));
ms->m_nodeid = cpu_to_le32(ret_nodeid);
- error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
- out:
- return error;
+ send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
}
/* which args we save from a received message depends heavily on the type
@@ -6258,29 +6193,24 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
spin_unlock_bh(&ls->ls_orphans_lock);
}
-static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
+static void send_purge(struct dlm_ls *ls, int nodeid, int pid)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
- int error;
- error = _create_message(ls, sizeof(struct dlm_message), nodeid,
- DLM_MSG_PURGE, &ms, &mh);
- if (error)
- return error;
+ _create_message(ls, sizeof(struct dlm_message), nodeid,
+ DLM_MSG_PURGE, &ms, &mh);
ms->m_nodeid = cpu_to_le32(nodeid);
ms->m_pid = cpu_to_le32(pid);
- return send_message(mh, ms, NULL, 0);
+ send_message(mh, ms, NULL, 0);
}
-int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
- int nodeid, int pid)
+void dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
+ int nodeid, int pid)
{
- int error = 0;
-
if (nodeid && (nodeid != dlm_our_nodeid())) {
- error = send_purge(ls, nodeid, pid);
+ send_purge(ls, nodeid, pid);
} else {
dlm_lock_recovery(ls);
if (pid == current->pid)
@@ -6289,7 +6219,6 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
do_purge(ls, nodeid, pid);
dlm_unlock_recovery(ls);
}
- return error;
}
/* debug functionality */
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index b23d7b854ed4..45ffe149ae3b 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -55,8 +55,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
uint32_t flags, uint32_t lkid, char *lvb_in);
int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
uint32_t flags, uint32_t lkid);
-int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
- int nodeid, int pid);
+void dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
+ int nodeid, int pid);
int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid);
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 5cb3896be826..4b1f5679907f 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -390,16 +390,15 @@ static int device_user_purge(struct dlm_user_proc *proc,
struct dlm_purge_params *params)
{
struct dlm_ls *ls;
- int error;
ls = dlm_find_lockspace_local(proc->lockspace);
if (!ls)
return -ENOENT;
- error = dlm_user_purge(ls, proc, params->nodeid, params->pid);
+ dlm_user_purge(ls, proc, params->nodeid, params->pid);
dlm_put_lockspace(ls);
- return error;
+ return 0;
}
static int device_create_lockspace(struct dlm_lspace_params *params)
--
2.43.0
next prev parent reply other threads:[~2024-11-07 20:46 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-11-07 20:46 [RFC dlm/next 00/11] dlm: approach for new lkb reference counting Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 01/11] dlm: remove set_master() negative return check Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 02/11] dlm: use move_lkb() instead del/add lkb Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 03/11] dlm: use hold_lkb() instead kref_get() Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 04/11] dlm: don't track references on move_lkb() Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 05/11] dlm: drop lkb hold for waiter conversion handling Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 06/11] dlm: track reference for lkb_rsb_lookup Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 07/11] dlm: call queue_cast() on master copy as well Alexander Aring
2024-11-07 20:46 ` Alexander Aring [this message]
2024-11-07 20:46 ` [RFC dlm/next 09/11] dlm: introduce new lkb refcount model Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 10/11] dlm: void convert, cancel and unlock requests Alexander Aring
2024-11-07 20:46 ` [RFC dlm/next 11/11] dlm: return void for _request_lock function Alexander Aring
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241107204617.147842-9-aahringo@redhat.com \
--to=aahringo@redhat.com \
--cc=gfs2@lists.linux.dev \
--cc=teigland@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox