From: Alexander Aring <aahringo@redhat.com>
To: teigland@redhat.com
Cc: gfs2@lists.linux.dev, aahringo@redhat.com
Subject: [PATCH v6.10-rc1 10/11] dlm: merge nodeid field to master_nodeid
Date: Tue, 28 May 2024 17:12:40 -0400 [thread overview]
Message-ID: <20240528211241.2140441-10-aahringo@redhat.com> (raw)
In-Reply-To: <20240528211241.2140441-1-aahringo@redhat.com>
There is a TODO in dlm_internal.h for the dlm_rsb struct:
"remove res_nodeid and only use res_master_nodeid"
The res_nodeid and res_master_nodeid should represent the same
information in a different numbers mapping. This patch tries to work on
this TODO by removing res_nodeid and use res_master_nodeid only. The
same applies for struct lkb with it's lkb_nodeid and lkb_master_nodeid
fields. It use the new representation for those numbers and introduce
backwardscompability because those representation are used for debugfs
and the DLM protocol. In future we might use the new number
representation for res_master_nodeid only and remove the copied value in
struct lkb.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/debug_fs.c | 33 +++++-----
fs/dlm/dlm_internal.h | 33 ++++++++--
fs/dlm/lock.c | 144 ++++++++++++++++++------------------------
fs/dlm/lock.h | 5 +-
fs/dlm/lockspace.c | 2 +-
fs/dlm/rcom.c | 4 +-
fs/dlm/recover.c | 20 ++----
fs/dlm/recoverd.c | 2 +-
8 files changed, 119 insertions(+), 124 deletions(-)
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 6ab3ed4074c6..40eb1696f3b6 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -58,9 +58,10 @@ static void print_format1_lock(struct seq_file *s, struct dlm_lkb *lkb,
lkb->lkb_status == DLM_LKSTS_WAITING)
seq_printf(s, " (%s)", print_lockmode(lkb->lkb_rqmode));
- if (lkb->lkb_nodeid) {
- if (lkb->lkb_nodeid != res->res_nodeid)
- seq_printf(s, " Remote: %3d %08x", lkb->lkb_nodeid,
+ if (lkb->lkb_master_nodeid != dlm_our_nodeid()) {
+ if (lkb->lkb_master_nodeid != res->res_master_nodeid)
+ seq_printf(s, " Remote: %3d %08x",
+ dlm_res_nodeid(lkb->lkb_master_nodeid),
lkb->lkb_remid);
else
seq_printf(s, " Master: %08x", lkb->lkb_remid);
@@ -88,16 +89,15 @@ static void print_format1(struct dlm_rsb *res, struct seq_file *s)
seq_printf(s, "%c", '.');
}
- if (res->res_nodeid > 0)
- seq_printf(s, "\"\nLocal Copy, Master is node %d\n",
- res->res_nodeid);
- else if (res->res_nodeid == 0)
- seq_puts(s, "\"\nMaster Copy\n");
- else if (res->res_nodeid == -1)
+
+ if (res->res_master_nodeid == 0)
seq_printf(s, "\"\nLooking up master (lkid %x)\n",
res->res_first_lkid);
+ else if (res->res_master_nodeid == dlm_our_nodeid())
+ seq_puts(s, "\"\nMaster Copy\n");
else
- seq_printf(s, "\"\nInvalid master %d\n", res->res_nodeid);
+ seq_printf(s, "\"\nLocal Copy, Master is node %d\n",
+ res->res_master_nodeid);
if (seq_has_overflowed(s))
goto out;
@@ -184,7 +184,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
seq_printf(s, "%x %d %x %u %llu %x %x %d %d %d %llu %u %d \"%s\"\n",
lkb->lkb_id,
- lkb->lkb_nodeid,
+ dlm_res_nodeid(lkb->lkb_master_nodeid),
lkb->lkb_remid,
lkb->lkb_ownpid,
(unsigned long long)xid,
@@ -194,7 +194,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
lkb->lkb_grmode,
lkb->lkb_rqmode,
(unsigned long long)us,
- r->res_nodeid,
+ dlm_res_nodeid(r->res_master_nodeid),
r->res_length,
r->res_name);
}
@@ -238,7 +238,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
seq_printf(s, "lkb %x %d %x %u %llu %x %x %d %d %d %d %d %d %u %llu %llu\n",
lkb->lkb_id,
- lkb->lkb_nodeid,
+ dlm_res_nodeid(lkb->lkb_master_nodeid),
lkb->lkb_remid,
lkb->lkb_ownpid,
(unsigned long long)xid,
@@ -265,7 +265,7 @@ static void print_format3(struct dlm_rsb *r, struct seq_file *s)
seq_printf(s, "rsb %p %d %x %lx %d %d %u %d ",
r,
- r->res_nodeid,
+ dlm_res_nodeid(r->res_master_nodeid),
r->res_first_lkid,
r->res_flags,
!list_empty(&r->res_root_list),
@@ -341,7 +341,7 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s)
seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
r,
- r->res_nodeid,
+ dlm_res_nodeid(r->res_master_nodeid),
r->res_master_nodeid,
r->res_dir_nodeid,
our_nodeid,
@@ -611,7 +611,8 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
ret = snprintf(debug_buf + pos, len - pos, "%x %d %d %s\n",
lkb->lkb_id, lkb->lkb_wait_type,
- lkb->lkb_nodeid, lkb->lkb_resource->res_name);
+ dlm_res_nodeid(lkb->lkb_master_nodeid),
+ lkb->lkb_resource->res_name);
if (ret >= len - pos)
break;
pos += ret;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 78960d914f68..b3b9aa7dbaa4 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -253,7 +253,7 @@ struct dlm_callback {
struct dlm_lkb {
struct dlm_rsb *lkb_resource; /* the rsb */
struct kref lkb_ref;
- int lkb_nodeid; /* copied from rsb */
+ unsigned int lkb_master_nodeid; /* copied from rsb */
int lkb_ownpid; /* pid of lock owner */
uint32_t lkb_id; /* our lock ID */
uint32_t lkb_remid; /* lock ID on remote partner */
@@ -303,18 +303,41 @@ struct dlm_lkb {
*
* res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
* greater than zero when another nodeid.
- *
- * (TODO: remove res_nodeid and only use res_master_nodeid)
*/
+/* For backwards compatibility, see above.
+ * The protocol is still using res_nodeid.
+ */
+static inline int dlm_res_nodeid(unsigned int res_master_nodeid)
+{
+ if (res_master_nodeid == 0)
+ return -1;
+ else if (res_master_nodeid == dlm_our_nodeid())
+ return 0;
+ else
+ return res_master_nodeid;
+}
+
+/* For backwards compatibility, see above.
+ * The protocol is still using res_nodeid.
+ */
+static inline unsigned int dlm_res_master_nodeid(int res_nodeid)
+{
+ if (res_nodeid == -1)
+ return 0;
+ else if (res_nodeid == 0)
+ return dlm_our_nodeid();
+ else
+ return res_nodeid;
+}
+
struct dlm_rsb {
struct dlm_ls *res_ls; /* the lockspace */
struct kref res_ref;
spinlock_t res_lock;
unsigned long res_flags;
int res_length; /* length of rsb name */
- int res_nodeid;
- int res_master_nodeid;
+ unsigned int res_master_nodeid;
int res_dir_nodeid;
unsigned long res_id; /* for ls_recover_xa */
uint32_t res_lvbseq;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e176cf48011a..a044edc2d143 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,21 +160,19 @@ static const int __quecvt_compat_matrix[8][8] = {
void dlm_print_lkb(struct dlm_lkb *lkb)
{
- printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
+ pr_err("lkb: master_nodeid %d id %x remid %x exflags %x flags %x "
"sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
- lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
- dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
- lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
- (unsigned long long)lkb->lkb_recover_seq);
+ lkb->lkb_master_nodeid, lkb->lkb_id, lkb->lkb_remid,
+ lkb->lkb_exflags, dlm_iflags_val(lkb), lkb->lkb_status,
+ lkb->lkb_rqmode, lkb->lkb_grmode, lkb->lkb_wait_type,
+ lkb->lkb_wait_nodeid, (unsigned long long)lkb->lkb_recover_seq);
}
static void dlm_print_rsb(struct dlm_rsb *r)
{
- printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
- "rlc %d name %s\n",
- r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
- r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
- r->res_name);
+ pr_err("rsb: master %d dir %d flags %lx first %x rlc %d name %s\n",
+ r->res_master_nodeid, r->res_dir_nodeid, r->res_flags,
+ r->res_first_lkid, r->res_recover_locks_count, r->res_name);
}
void dlm_dump_rsb(struct dlm_rsb *r)
@@ -243,13 +241,13 @@ static inline int is_granted(struct dlm_lkb *lkb)
static inline int is_remote(struct dlm_rsb *r)
{
- DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
- return !!r->res_nodeid;
+ DLM_ASSERT(r->res_master_nodeid != 0, dlm_print_rsb(r););
+ return r->res_master_nodeid != dlm_our_nodeid();
}
static inline int is_process_copy(struct dlm_lkb *lkb)
{
- return lkb->lkb_nodeid &&
+ return lkb->lkb_master_nodeid != dlm_our_nodeid() &&
!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
}
@@ -819,7 +817,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
dlm_print_rsb(r);
/* fix it and go on */
r->res_master_nodeid = our_nodeid;
- r->res_nodeid = 0;
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
r->res_first_lkid = 0;
}
@@ -865,7 +862,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
log_debug(ls, "find_rsb new from_dir %d recreate %s",
from_nodeid, r->res_name);
r->res_master_nodeid = our_nodeid;
- r->res_nodeid = 0;
goto out_add;
}
@@ -888,11 +884,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
/* When we are the dir nodeid, we can set the master
node immediately */
r->res_master_nodeid = our_nodeid;
- r->res_nodeid = 0;
} else {
/* set_master will send_lookup to dir_nodeid */
r->res_master_nodeid = 0;
- r->res_nodeid = -1;
}
out_add:
@@ -995,7 +989,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
our_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
r->res_master_nodeid = our_nodeid;
- r->res_nodeid = 0;
}
list_move(&r->res_rsbs_list, &ls->ls_keep);
@@ -1023,7 +1016,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
r->res_hash = hash;
r->res_dir_nodeid = dir_nodeid;
r->res_master_nodeid = dir_nodeid;
- r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
kref_init(&r->res_ref);
write_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1103,7 +1095,8 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
return -ENOTBLK;
} else {
/* our rsb is not master, but the dir nodeid has sent us a
- request; this could happen with master 0 / res_nodeid -1 */
+ * request; this could happen with res_master_nodeid 0
+ */
if (r->res_master_nodeid) {
log_error(ls, "validate master from_dir %d master %d "
@@ -1113,7 +1106,6 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
}
r->res_master_nodeid = dlm_our_nodeid();
- r->res_nodeid = 0;
return 0;
}
}
@@ -1136,11 +1128,10 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
/* Recovery uses this function to set a new master when
* the previous master failed. Setting NEW_MASTER will
* force dlm_recover_masters to call recover_master on this
- * rsb even though the res_nodeid is no longer removed.
+ * rsb even though the res_master_nodeid is no longer removed.
*/
r->res_master_nodeid = from_nodeid;
- r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_NEW_MASTER);
if (toss_list) {
@@ -1156,9 +1147,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
* cycle before recovering this master value
*/
- log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
+ log_limit(ls, "%s from_master %d master_nodeid %d first %x %s",
__func__, from_nodeid, r->res_master_nodeid,
- r->res_nodeid, r->res_first_lkid, r->res_name);
+ r->res_first_lkid, r->res_name);
if (r->res_master_nodeid == our_nodeid) {
log_error(ls, "from_master %d our_master", from_nodeid);
@@ -1167,7 +1158,6 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
}
r->res_master_nodeid = from_nodeid;
- r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_NEW_MASTER);
}
@@ -1179,7 +1169,6 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
log_debug(ls, "%s master 0 to %d first %x %s", __func__,
from_nodeid, r->res_first_lkid, r->res_name);
r->res_master_nodeid = from_nodeid;
- r->res_nodeid = from_nodeid;
}
if (!from_master && !fix_master &&
@@ -1341,7 +1330,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_hash = hash;
r->res_dir_nodeid = our_nodeid;
r->res_master_nodeid = from_nodeid;
- r->res_nodeid = from_nodeid;
+ kref_init(&r->res_ref);
rsb_set_flag(r, RSB_TOSS);
write_lock_bh(&ls->ls_rsbtbl_lock);
@@ -1476,7 +1465,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
lkb->lkb_last_cb_mode = DLM_LOCK_IV;
- lkb->lkb_nodeid = -1;
lkb->lkb_grmode = DLM_LOCK_IV;
kref_init(&lkb->lkb_ref);
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
@@ -2438,8 +2426,9 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
}
if (!demoted && is_demoted(lkb)) {
- log_print("WARN: pending demoted %x node %d %s",
- lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
+ log_print("WARN: pending demoted %x master_node %d %s",
+ lkb->lkb_id, lkb->lkb_master_nodeid,
+ r->res_name);
demote_restart = 1;
continue;
}
@@ -2457,7 +2446,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
}
} else {
log_print("WARN: pending deadlock %x node %d %s",
- lkb->lkb_id, lkb->lkb_nodeid,
+ lkb->lkb_id, lkb->lkb_master_nodeid,
r->res_name);
dlm_dump_rsb(r);
}
@@ -2526,7 +2515,8 @@ static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
int cw = 0;
if (!is_master(r)) {
- log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+ log_print("%s r master_nodeid %d", __func__,
+ r->res_master_nodeid);
dlm_dump_rsb(r);
return;
}
@@ -2622,7 +2612,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
r->res_first_lkid = lkb->lkb_id;
- lkb->lkb_nodeid = r->res_nodeid;
+ lkb->lkb_master_nodeid = r->res_master_nodeid;
return 0;
}
@@ -2631,13 +2621,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
return 1;
}
- if (r->res_master_nodeid == our_nodeid) {
- lkb->lkb_nodeid = 0;
- return 0;
- }
-
if (r->res_master_nodeid) {
- lkb->lkb_nodeid = r->res_master_nodeid;
+ lkb->lkb_master_nodeid = r->res_master_nodeid;
return 0;
}
@@ -2652,8 +2637,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
r->res_name);
r->res_master_nodeid = our_nodeid;
- r->res_nodeid = 0;
- lkb->lkb_nodeid = 0;
+ lkb->lkb_master_nodeid = our_nodeid;
return 0;
}
@@ -2850,7 +2834,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
for success */
-/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
+/* note: it's valid for lkb_master_nodeid/res_master_nodeid to be 0 when we get here
because there may be a lookup in progress and it's valid to do
cancel/unlockf on it */
@@ -3531,7 +3515,7 @@ static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms)
{
- ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+ ms->m_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
ms->m_lkid = cpu_to_le32(lkb->lkb_id);
ms->m_remid = cpu_to_le32(lkb->lkb_remid);
@@ -3578,7 +3562,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
struct dlm_mhandle *mh;
int to_nodeid, error;
- to_nodeid = r->res_nodeid;
+ to_nodeid = r->res_master_nodeid;
error = add_to_waiters(lkb, mstype, to_nodeid);
if (error)
@@ -3642,7 +3626,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
struct dlm_mhandle *mh;
int to_nodeid, error;
- to_nodeid = lkb->lkb_nodeid;
+ to_nodeid = lkb->lkb_master_nodeid;
error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
if (error)
@@ -3663,7 +3647,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
struct dlm_mhandle *mh;
int to_nodeid, error;
- to_nodeid = lkb->lkb_nodeid;
+ to_nodeid = lkb->lkb_master_nodeid;
error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
if (error)
@@ -3733,7 +3717,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_mhandle *mh;
int to_nodeid, error;
- to_nodeid = lkb->lkb_nodeid;
+ to_nodeid = lkb->lkb_master_nodeid;
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
if (error)
@@ -3849,7 +3833,7 @@ static void fake_astfn(void *astparam)
static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
const struct dlm_message *ms)
{
- lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+ lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(ms->m_header.h_nodeid));
lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
lkb->lkb_grmode = DLM_LOCK_IV;
@@ -3897,7 +3881,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
{
struct dlm_lkb *lkb = &ls->ls_local_lkb;
- lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
+ lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(ms->m_header.h_nodeid));
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
}
@@ -3922,7 +3906,7 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
case cpu_to_le32(DLM_MSG_CONVERT):
case cpu_to_le32(DLM_MSG_UNLOCK):
case cpu_to_le32(DLM_MSG_CANCEL):
- if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
+ if (!is_master_copy(lkb) || lkb->lkb_master_nodeid != from)
error = -EINVAL;
break;
@@ -3931,14 +3915,14 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
case cpu_to_le32(DLM_MSG_GRANT):
case cpu_to_le32(DLM_MSG_BAST):
- if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
+ if (!is_process_copy(lkb) || lkb->lkb_master_nodeid != from)
error = -EINVAL;
break;
case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
if (!is_process_copy(lkb))
error = -EINVAL;
- else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
+ else if (lkb->lkb_master_nodeid != 0 && lkb->lkb_master_nodeid != from)
error = -EINVAL;
break;
@@ -3952,7 +3936,7 @@ static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
"ignore invalid message %d from %d %x %x %x %d",
le32_to_cpu(ms->m_type), from, lkb->lkb_id,
lkb->lkb_remid, dlm_iflags_val(lkb),
- lkb->lkb_nodeid);
+ lkb->lkb_master_nodeid);
return error;
}
@@ -4378,8 +4362,7 @@ static int receive_request_reply(struct dlm_ls *ls,
lookup as a request and sent request reply instead of lookup reply */
if (mstype == DLM_MSG_LOOKUP) {
r->res_master_nodeid = from_nodeid;
- r->res_nodeid = from_nodeid;
- lkb->lkb_nodeid = from_nodeid;
+ lkb->lkb_master_nodeid = from_nodeid;
}
/* this is the value returned from do_request() on the master */
@@ -4421,8 +4404,7 @@ static int receive_request_reply(struct dlm_ls *ls,
r->res_master_nodeid != dlm_our_nodeid()) {
/* cause _request_lock->set_master->send_lookup */
r->res_master_nodeid = 0;
- r->res_nodeid = -1;
- lkb->lkb_nodeid = -1;
+ lkb->lkb_master_nodeid = 0;
}
if (is_overlap(lkb)) {
@@ -4696,7 +4678,6 @@ static void receive_lookup_reply(struct dlm_ls *ls,
if (ret_nodeid == dlm_our_nodeid()) {
r->res_master_nodeid = ret_nodeid;
- r->res_nodeid = 0;
do_lookup_list = 1;
r->res_first_lkid = 0;
} else if (ret_nodeid == -1) {
@@ -4704,12 +4685,10 @@ static void receive_lookup_reply(struct dlm_ls *ls,
log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
r->res_master_nodeid = 0;
- r->res_nodeid = -1;
- lkb->lkb_nodeid = -1;
+ lkb->lkb_master_nodeid = 0;
} else {
- /* set_master() will set lkb_nodeid from r */
+ /* set_master() will set lkb_master_nodeid from r */
r->res_master_nodeid = ret_nodeid;
- r->res_nodeid = ret_nodeid;
}
if (is_overlap(lkb)) {
@@ -4976,7 +4955,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
- ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+ ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
_receive_convert_reply(lkb, ms_local, true);
/* Same special case as in receive_rcom_lock_args() */
@@ -5032,13 +5011,12 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
many and they aren't very interesting */
if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
- log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
- "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+ log_debug(ls, "waiter %x remote %x msg %d master_nodeid %d lkb_master_nodeid %d wait_nodeid %d dir_nodeid %d",
lkb->lkb_id,
lkb->lkb_remid,
lkb->lkb_wait_type,
- lkb->lkb_resource->res_nodeid,
- lkb->lkb_nodeid,
+ lkb->lkb_resource->res_master_nodeid,
+ lkb->lkb_master_nodeid,
lkb->lkb_wait_nodeid,
dir_nodeid);
}
@@ -5095,7 +5073,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
- ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+ ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
_receive_unlock_reply(lkb, ms_local, true);
dlm_put_lkb(lkb);
break;
@@ -5105,7 +5083,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
- ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
+ ms_local->m_header.h_nodeid = cpu_to_le32(dlm_res_nodeid(lkb->lkb_master_nodeid));
_receive_cancel_reply(lkb, ms_local, true);
dlm_put_lkb(lkb);
break;
@@ -5201,11 +5179,10 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
&lkb->lkb_iflags);
err = 0;
- log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
- "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
- "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
- r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
- dlm_dir_nodeid(r), oc, ou);
+ log_debug(ls, "waiter %x remote %x msg %d master_nodeid %d lkb_master_nodeid %d wait_nodeid %d dir_nodeid %d overlap %d %d",
+ lkb->lkb_id, lkb->lkb_remid, mstype,
+ r->res_master_nodeid, lkb->lkb_master_nodeid,
+ lkb->lkb_wait_nodeid, dlm_dir_nodeid(r), oc, ou);
/*
* No reply to the pre-recovery operation will now be received,
@@ -5278,9 +5255,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
}
if (err) {
- log_error(ls, "waiter %x msg %d r_nodeid %d "
+ log_error(ls, "waiter %x msg %d master_nodeid %d "
"dir_nodeid %d overlap %d %d",
- lkb->lkb_id, mstype, r->res_nodeid,
+ lkb->lkb_id, mstype, r->res_master_nodeid,
dlm_dir_nodeid(r), oc, ou);
}
unlock_rsb(r);
@@ -5333,8 +5310,8 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
if (!is_master_copy(lkb))
continue;
- if ((lkb->lkb_nodeid == nodeid_gone) ||
- dlm_is_removed(ls, lkb->lkb_nodeid)) {
+ if ((lkb->lkb_master_nodeid == nodeid_gone) ||
+ dlm_is_removed(ls, lkb->lkb_master_nodeid)) {
/* tell recover_lvb to invalidate the lvb
because a node holding EX/PW failed */
@@ -5471,7 +5448,7 @@ static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
struct dlm_lkb *lkb;
list_for_each_entry(lkb, head, lkb_statequeue) {
- if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
+ if (lkb->lkb_master_nodeid == nodeid && lkb->lkb_remid == remid)
return lkb;
}
return NULL;
@@ -5500,7 +5477,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
{
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
- lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
+ lkb->lkb_master_nodeid = dlm_res_master_nodeid(le32_to_cpu(rc->rc_header.h_nodeid));
lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
@@ -6247,7 +6224,8 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
/* debug functionality */
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
- int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
+ int lkb_master_nodeid, unsigned int lkb_dflags,
+ int lkb_status)
{
struct dlm_lksb *lksb;
struct dlm_lkb *lkb;
@@ -6269,7 +6247,7 @@ int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
}
dlm_set_dflags_val(lkb, lkb_dflags);
- lkb->lkb_nodeid = lkb_nodeid;
+ lkb->lkb_master_nodeid = lkb_master_nodeid;
lkb->lkb_lksb = lksb;
/* user specific pointer, just don't have it NULL for kernel locks */
if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 8de9dee4c058..e59161d2fe84 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -61,13 +61,14 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid);
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
- int lkb_nodeid, unsigned int lkb_flags, int lkb_status);
+ int lkb_master_nodeid, unsigned int lkb_flags,
+ int lkb_status);
int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
int mstype, int to_nodeid);
static inline int is_master(struct dlm_rsb *r)
{
- return !r->res_nodeid;
+ return r->res_master_nodeid == dlm_our_nodeid();
}
static inline void lock_rsb(struct dlm_rsb *r)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 9c3118a698d6..e011167a924a 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -660,7 +660,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
}
} else if (force == 1) {
xa_for_each(&ls->ls_lkbxa, id, lkb) {
- if (lkb->lkb_nodeid == 0 &&
+ if (lkb->lkb_master_nodeid == dlm_our_nodeid() &&
lkb->lkb_grmode != DLM_LOCK_IV) {
rv = 1;
break;
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index be1a71a6303a..b545cd7a23cd 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -455,8 +455,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
if (lkb->lkb_lvbptr)
len += ls->ls_lvblen;
- error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
- seq);
+ error = create_rcom(ls, r->res_master_nodeid, DLM_RCOM_LOCK, len, &rc,
+ &mh, seq);
if (error)
goto out;
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index d156196b9e69..d948d28d8f92 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -408,7 +408,7 @@ static void set_lock_master(struct list_head *queue, int nodeid)
list_for_each_entry(lkb, queue, lkb_statequeue) {
if (!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
- lkb->lkb_nodeid = nodeid;
+ lkb->lkb_master_nodeid = nodeid;
lkb->lkb_remid = 0;
}
}
@@ -416,9 +416,9 @@ static void set_lock_master(struct list_head *queue, int nodeid)
static void set_master_lkbs(struct dlm_rsb *r)
{
- set_lock_master(&r->res_grantqueue, r->res_nodeid);
- set_lock_master(&r->res_convertqueue, r->res_nodeid);
- set_lock_master(&r->res_waitqueue, r->res_nodeid);
+ set_lock_master(&r->res_grantqueue, r->res_master_nodeid);
+ set_lock_master(&r->res_convertqueue, r->res_master_nodeid);
+ set_lock_master(&r->res_waitqueue, r->res_master_nodeid);
}
/*
@@ -455,7 +455,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
if (is_master(r))
return 0;
- is_removed = dlm_is_removed(ls, r->res_nodeid);
+ is_removed = dlm_is_removed(ls, r->res_master_nodeid);
if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
return 0;
@@ -464,10 +464,8 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid == our_nodeid) {
- if (is_removed) {
+ if (is_removed)
r->res_master_nodeid = our_nodeid;
- r->res_nodeid = 0;
- }
/* set master of lkbs to ourself when is_removed, or to
another new master which we set along with NEW_MASTER
@@ -501,14 +499,9 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
{
int dir_nodeid = dlm_dir_nodeid(r);
- int new_master = dir_nodeid;
-
- if (dir_nodeid == dlm_our_nodeid())
- new_master = 0;
dlm_purge_mstcpy_locks(r);
r->res_master_nodeid = dir_nodeid;
- r->res_nodeid = new_master;
set_new_master(r);
(*count)++;
return 0;
@@ -584,7 +577,6 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
lock_rsb(r);
r->res_master_nodeid = ret_nodeid;
- r->res_nodeid = new_master;
set_new_master(r);
unlock_rsb(r);
recover_xa_del(r);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 7e6c2c27d815..aae97bdc05f6 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -34,7 +34,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
- if (r->res_nodeid)
+ if (r->res_master_nodeid != dlm_our_nodeid())
continue;
list_add(&r->res_masters_list, &ls->ls_masters_list);
--
2.43.0
next prev parent reply other threads:[~2024-05-28 21:12 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-05-28 21:12 [PATCH v6.10-rc1 01/11] dlm: remove scand leftovers Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 02/11] dlm: don't kref_init rsbs created for toss list Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 03/11] dlm: remove unused parameter in dlm_midcomms_addr Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 04/11] dlm: remove ls_local_handle from struct dlm_ls Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 05/11] dlm: rename dlm_find_lockspace_local to dlm_get_lockspace Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 06/11] dlm: drop own rsb pre allocation mechanism Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 07/11] dlm: using rcu to avoid rsb lookup again Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 08/11] dlm: move lkb idr to xarray datastructure Alexander Aring
2024-05-28 21:12 ` [PATCH v6.10-rc1 09/11] dlm: move recover " Alexander Aring
2024-05-28 21:12 ` Alexander Aring [this message]
2024-05-28 21:12 ` [PATCH v6.10-rc1 11/11] dlm: use is_master() where it's prossible Alexander Aring
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240528211241.2140441-10-aahringo@redhat.com \
--to=aahringo@redhat.com \
--cc=gfs2@lists.linux.dev \
--cc=teigland@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox