All of lore.kernel.org
 help / color / mirror / Atom feed
From: swhiteho@redhat.com <swhiteho@redhat.com>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] [PATCH 51/51] [DLM] block dlm_recv in recovery transition
Date: Thu,  4 Oct 2007 09:49:44 +0100	[thread overview]
Message-ID: <1191487882346-git-send-email-swhiteho@redhat.com> (raw)
In-Reply-To: <1191487880270-git-send-email-swhiteho@redhat.com>

From: David Teigland <teigland@redhat.com>

Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm.  This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.

The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle.  While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up.  dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero.  This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.

The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv.  The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa.  Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 74901e9..d2fc238 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -491,6 +491,7 @@ struct dlm_ls {
 	uint64_t		ls_recover_seq;
 	struct dlm_recover	*ls_recover_args;
 	struct rw_semaphore	ls_in_recovery;	/* block local requests */
+	struct rw_semaphore	ls_recv_active;	/* block dlm_recv */
 	struct list_head	ls_requestqueue;/* queue remote requests */
 	struct mutex		ls_requestqueue_mutex;
 	char			*ls_recover_buf;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 031229f..3915b8e 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	dlm_put_lkb(lkb);
 }
 
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
 {
-	struct dlm_message *ms = (struct dlm_message *) hd;
-	struct dlm_ls *ls;
-	int error = 0;
-
-	if (!recovery)
-		dlm_message_in(ms);
-
-	ls = dlm_find_lockspace_global(hd->h_lockspace);
-	if (!ls) {
-		log_print("drop message %d from %d for unknown lockspace %d",
-			  ms->m_type, nodeid, hd->h_lockspace);
-		return -EINVAL;
-	}
-
-	/* recovery may have just ended leaving a bunch of backed-up requests
-	   in the requestqueue; wait while dlm_recoverd clears them */
-
-	if (!recovery)
-		dlm_wait_requestqueue(ls);
-
-	/* recovery may have just started while there were a bunch of
-	   in-flight requests -- save them in requestqueue to be processed
-	   after recovery.  we can't let dlm_recvd block on the recovery
-	   lock.  if dlm_recoverd is calling this function to clear the
-	   requestqueue, it needs to be interrupted (-EINTR) if another
-	   recovery operation is starting. */
-
-	while (1) {
-		if (dlm_locking_stopped(ls)) {
-			if (recovery) {
-				error = -EINTR;
-				goto out;
-			}
-			error = dlm_add_requestqueue(ls, nodeid, hd);
-			if (error == -EAGAIN)
-				continue;
-			else {
-				error = -EINTR;
-				goto out;
-			}
-		}
-
-		if (dlm_lock_recovery_try(ls))
-			break;
-		schedule();
-	}
-
 	switch (ms->m_type) {
 
 	/* messages sent to a master node */
@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
 		log_error(ls, "unknown message type %d", ms->m_type);
 	}
 
-	dlm_unlock_recovery(ls);
- out:
-	dlm_put_lockspace(ls);
 	dlm_astd_wake();
-	return error;
 }
 
+/* If the lockspace is in recovery mode (locking stopped), then normal
+   messages are saved on the requestqueue for processing after recovery is
+   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
+   messages off the requestqueue before we process new ones. This occurs right
+   after recovery completes when we transition from saving all messages on
+   requestqueue, to processing all the saved messages, to processing new
+   messages as they arrive. */
 
-/*
- * Recovery related
- */
+static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+				int nodeid)
+{
+	if (dlm_locking_stopped(ls)) {
+		dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
+	} else {
+		dlm_wait_requestqueue(ls);
+		_receive_message(ls, ms);
+	}
+}
+
+/* This is called by dlm_recoverd to process messages that were saved on
+   the requestqueue. */
+
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+{
+	_receive_message(ls, ms);
+}
+
+/* This is called by the midcomms layer when something is received for
+   the lockspace.  It could be either a MSG (normal message sent as part of
+   standard locking activity) or an RCOM (recovery message sent as part of
+   lockspace recovery). */
+
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
+{
+	struct dlm_message *ms = (struct dlm_message *) hd;
+	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+	struct dlm_ls *ls;
+	int type = 0;
+
+	switch (hd->h_cmd) {
+	case DLM_MSG:
+		dlm_message_in(ms);
+		type = ms->m_type;
+		break;
+	case DLM_RCOM:
+		dlm_rcom_in(rc);
+		type = rc->rc_type;
+		break;
+	default:
+		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
+		return;
+	}
+
+	if (hd->h_nodeid != nodeid) {
+		log_print("invalid h_nodeid %d from %d lockspace %x",
+			  hd->h_nodeid, nodeid, hd->h_lockspace);
+		return;
+	}
+
+	ls = dlm_find_lockspace_global(hd->h_lockspace);
+	if (!ls) {
+		log_print("invalid h_lockspace %x from %d cmd %d type %d",
+			  hd->h_lockspace, nodeid, hd->h_cmd, type);
+
+		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
+			dlm_send_ls_not_ready(nodeid, rc);
+		return;
+	}
+
+	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
+	   be inactive (in this ls) before transitioning to recovery mode */
+
+	down_read(&ls->ls_recv_active);
+	if (hd->h_cmd == DLM_MSG)
+		dlm_receive_message(ls, ms, nodeid);
+	else
+		dlm_receive_rcom(ls, rc, nodeid);
+	up_read(&ls->ls_recv_active);
+
+	dlm_put_lockspace(ls);
+}
 
 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 1720313..ada0468 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -16,7 +16,8 @@
 void dlm_print_rsb(struct dlm_rsb *r);
 void dlm_dump_rsb(struct dlm_rsb *r);
 void dlm_print_lkb(struct dlm_lkb *lkb);
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
 int dlm_modes_compat(int mode1, int mode2);
 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
 	unsigned int flags, struct dlm_rsb **r_ret);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 1dc7210..628eaa6 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
 	ls->ls_recover_seq = 0;
 	ls->ls_recover_args = NULL;
 	init_rwsem(&ls->ls_in_recovery);
+	init_rwsem(&ls->ls_recv_active);
 	INIT_LIST_HEAD(&ls->ls_requestqueue);
 	mutex_init(&ls->ls_requestqueue_mutex);
 	mutex_init(&ls->ls_clear_proc_locks);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index d099775..e9cdcab 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -18,10 +18,6 @@
 #include "rcom.h"
 #include "config.h"
 
-/*
- * Following called by dlm_recoverd thread
- */
-
 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 {
 	struct dlm_member *memb = NULL;
@@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 	return error;
 }
 
-/*
- * Following called from lockspace.c
- */
+/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
+   dlm_ls_start() is called on any of them to start the new recovery. */
 
 int dlm_ls_stop(struct dlm_ls *ls)
 {
 	int new;
 
 	/*
-	 * A stop cancels any recovery that's in progress (see RECOVERY_STOP,
-	 * dlm_recovery_stopped()) and prevents any new locks from being
-	 * processed (see RUNNING, dlm_locking_stopped()).
+	 * Prevent dlm_recv from being in the middle of something when we do
+	 * the stop.  This includes ensuring dlm_recv isn't processing a
+	 * recovery message (rcom), while dlm_recoverd is aborting and
+	 * resetting things from an in-progress recovery.  i.e. we want
+	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
+	 * processing an rcom at the same time.  Stopping dlm_recv also makes
+	 * it easy for dlm_receive_message() to check locking stopped and add a
+	 * message to the requestqueue without races.
+	 */
+
+	down_write(&ls->ls_recv_active);
+
+	/*
+	 * Abort any recovery that's in progress (see RECOVERY_STOP,
+	 * dlm_recovery_stopped()) and tell any other threads running in the
+	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 	 */
 
 	spin_lock(&ls->ls_recover_lock);
@@ -271,8 +279,14 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	spin_unlock(&ls->ls_recover_lock);
 
 	/*
+	 * Let dlm_recv run again, now any normal messages will be saved on the
+	 * requestqueue for later.
+	 */
+
+	up_write(&ls->ls_recv_active);
+
+	/*
 	 * This in_recovery lock does two things:
-	 *
 	 * 1) Keeps this function from returning until all threads are out
 	 *    of locking routines and locking is truely stopped.
 	 * 2) Keeps any new requests from being processed until it's unlocked
@@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls)
 
 	/*
 	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
-	 * running) has noticed the clearing of RUNNING above and quit
-	 * processing the previous recovery.  This will be true for all nodes
-	 * before any nodes start the new recovery.
+	 * running) has noticed RECOVERY_STOP above and quit processing the
+	 * previous recovery.
 	 */
 
 	dlm_recoverd_suspend(ls);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index a5126e0..f8c69dd 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -27,7 +27,6 @@
 #include "dlm_internal.h"
 #include "lowcomms.h"
 #include "config.h"
-#include "rcom.h"
 #include "lock.h"
 #include "midcomms.h"
 
@@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
 		offset &= (limit - 1);
 		len -= msglen;
 
-		switch (msg->h_cmd) {
-		case DLM_MSG:
-			dlm_receive_message(msg, nodeid, 0);
-			break;
-
-		case DLM_RCOM:
-			dlm_receive_rcom(msg, nodeid);
-			break;
-
-		default:
-			log_print("unknown msg type %x from %u: %u %u %u %u",
-				  msg->h_cmd, nodeid, msglen, len, offset, ret);
-		}
+		dlm_receive_buffer(msg, nodeid);
 	}
 
 	if (msg != (struct dlm_header *) __tmp)
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 188b91c..ae2fd97 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -386,7 +386,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 	dlm_recover_process_copy(ls, rc_in);
 }
 
-static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
+/* If the lockspace doesn't exist then still send a status message
+   back; it's possible that it just doesn't have its global_id yet. */
+
+int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 {
 	struct dlm_rcom *rc;
 	struct rcom_config *rf;
@@ -446,28 +449,11 @@ static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 	return rv;
 }
 
-/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
+/* Called by dlm_recv; corresponds to dlm_receive_message() but special
    recovery-only comms are sent through here. */
 
-void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
-	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
-	struct dlm_ls *ls;
-
-	dlm_rcom_in(rc);
-
-	/* If the lockspace doesn't exist then still send a status message
-	   back; it's possible that it just doesn't have its global_id yet. */
-
-	ls = dlm_find_lockspace_global(hd->h_lockspace);
-	if (!ls) {
-		log_print("lockspace %x from %d type %x not found",
-			  hd->h_lockspace, nodeid, rc->rc_type);
-		if (rc->rc_type == DLM_RCOM_STATUS)
-			send_ls_not_ready(nodeid, rc);
-		return;
-	}
-
 	if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
 		log_debug(ls, "ignoring recovery message %x from %d",
 			  rc->rc_type, nodeid);
@@ -477,12 +463,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
 	if (is_old_reply(ls, rc))
 		goto out;
 
-	if (nodeid != rc->rc_header.h_nodeid) {
-		log_error(ls, "bad rcom nodeid %d from %d",
-			  rc->rc_header.h_nodeid, nodeid);
-		goto out;
-	}
-
 	switch (rc->rc_type) {
 	case DLM_RCOM_STATUS:
 		receive_rcom_status(ls, rc);
@@ -520,6 +500,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
 		DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
 	}
  out:
-	dlm_put_lockspace(ls);
+	return;
 }
 
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index d798432..b09abd2 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -18,7 +18,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
-void dlm_receive_rcom(struct dlm_header *hd, int nodeid);
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
+int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
 
 #endif
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 6657599..4b89e20 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -24,19 +24,28 @@
 
 
 /* If the start for which we're re-enabling locking (seq) has been superseded
-   by a newer stop (ls_recover_seq), we need to leave locking disabled. */
+   by a newer stop (ls_recover_seq), we need to leave locking disabled.
+
+   We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
+   locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
+   enables locking and clears the requestqueue between a and b. */
 
 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 {
 	int error = -EINTR;
 
+	down_write(&ls->ls_recv_active);
+
 	spin_lock(&ls->ls_recover_lock);
 	if (ls->ls_recover_seq == seq) {
 		set_bit(LSFL_RUNNING, &ls->ls_flags);
+		/* unblocks processes waiting to enter the dlm */
 		up_write(&ls->ls_in_recovery);
 		error = 0;
 	}
 	spin_unlock(&ls->ls_recover_lock);
+
+	up_write(&ls->ls_recv_active);
 	return error;
 }
 
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 65008d7..0de04f1 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -20,7 +20,7 @@
 struct rq_entry {
 	struct list_head list;
 	int nodeid;
-	char request[1];
+	char request[0];
 };
 
 /*
@@ -30,42 +30,39 @@ struct rq_entry {
  * lockspace is enabled on some while still suspended on others.
  */
 
-int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
 {
 	struct rq_entry *e;
 	int length = hd->h_length;
-	int rv = 0;
 
 	e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
 	if (!e) {
-		log_print("dlm_add_requestqueue: out of memory\n");
-		return 0;
+		log_print("dlm_add_requestqueue: out of memory len %d", length);
+		return;
 	}
 
 	e->nodeid = nodeid;
 	memcpy(e->request, hd, length);
 
-	/* We need to check dlm_locking_stopped() after taking the mutex to
-	   avoid a race where dlm_recoverd enables locking and runs
-	   process_requestqueue between our earlier dlm_locking_stopped check
-	   and this addition to the requestqueue. */
-
 	mutex_lock(&ls->ls_requestqueue_mutex);
-	if (dlm_locking_stopped(ls))
-		list_add_tail(&e->list, &ls->ls_requestqueue);
-	else {
-		log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid);
-		kfree(e);
-		rv = -EAGAIN;
-	}
+	list_add_tail(&e->list, &ls->ls_requestqueue);
 	mutex_unlock(&ls->ls_requestqueue_mutex);
-	return rv;
 }
 
+/*
+ * Called by dlm_recoverd to process normal messages saved while recovery was
+ * happening.  Normal locking has been enabled before this is called.  dlm_recv
+ * upon receiving a message, will wait for all saved messages to be drained
+ * here before processing the message it got.  If a new dlm_ls_stop() arrives
+ * while we're processing these saved messages, it may block trying to suspend
+ * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue.  In that
+ * case, we don't abort since locking_stopped is still 0.  If dlm_recv is not
+ * waiting for us, then this processing may be aborted due to locking_stopped.
+ */
+
 int dlm_process_requestqueue(struct dlm_ls *ls)
 {
 	struct rq_entry *e;
-	struct dlm_header *hd;
 	int error = 0;
 
 	mutex_lock(&ls->ls_requestqueue_mutex);
@@ -79,14 +76,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 		e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
 		mutex_unlock(&ls->ls_requestqueue_mutex);
 
-		hd = (struct dlm_header *) e->request;
-		error = dlm_receive_message(hd, e->nodeid, 1);
-
-		if (error == -EINTR) {
-			/* entry is left on requestqueue */
-			log_debug(ls, "process_requestqueue abort eintr");
-			break;
-		}
+		dlm_receive_message_saved(ls, (struct dlm_message *)e->request);
 
 		mutex_lock(&ls->ls_requestqueue_mutex);
 		list_del(&e->list);
@@ -106,10 +96,12 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 
 /*
  * After recovery is done, locking is resumed and dlm_recoverd takes all the
- * saved requests and processes them as they would have been by dlm_recvd.  At
- * the same time, dlm_recvd will start receiving new requests from remote
- * nodes.  We want to delay dlm_recvd processing new requests until
- * dlm_recoverd has finished processing the old saved requests.
+ * saved requests and processes them as they would have been by dlm_recv.  At
+ * the same time, dlm_recv will start receiving new requests from remote nodes.
+ * We want to delay dlm_recv processing new requests until dlm_recoverd has
+ * finished processing the old saved requests.  We don't check for locking
+ * stopped here because dlm_ls_stop won't stop locking until it's suspended us
+ * (dlm_recv).
  */
 
 void dlm_wait_requestqueue(struct dlm_ls *ls)
@@ -118,8 +110,6 @@ void dlm_wait_requestqueue(struct dlm_ls *ls)
 		mutex_lock(&ls->ls_requestqueue_mutex);
 		if (list_empty(&ls->ls_requestqueue))
 			break;
-		if (dlm_locking_stopped(ls))
-			break;
 		mutex_unlock(&ls->ls_requestqueue_mutex);
 		schedule();
 	}
diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h
index 6a53ea0..aba34fc 100644
--- a/fs/dlm/requestqueue.h
+++ b/fs/dlm/requestqueue.h
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -13,7 +13,7 @@
 #ifndef __REQUESTQUEUE_DOT_H__
 #define __REQUESTQUEUE_DOT_H__
 
-int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
 int dlm_process_requestqueue(struct dlm_ls *ls);
 void dlm_wait_requestqueue(struct dlm_ls *ls);
 void dlm_purge_requestqueue(struct dlm_ls *ls);
-- 
1.5.1.2



WARNING: multiple messages have this Message-ID (diff)
From: swhiteho@redhat.com
To: linux-kernel@vger.kernel.org, cluster-devel@redhat.com
Cc: David Teigland <teigland@redhat.com>,
	Steven Whitehouse <swhiteho@redhat.com>
Subject: [PATCH 51/51] [DLM] block dlm_recv in recovery transition
Date: Thu,  4 Oct 2007 09:49:44 +0100	[thread overview]
Message-ID: <1191487882346-git-send-email-swhiteho@redhat.com> (raw)
In-Reply-To: <1191487880270-git-send-email-swhiteho@redhat.com>

From: David Teigland <teigland@redhat.com>

Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm.  This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.

The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle.  While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up.  dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero.  This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.

The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv.  The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa.  Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 74901e9..d2fc238 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -491,6 +491,7 @@ struct dlm_ls {
 	uint64_t		ls_recover_seq;
 	struct dlm_recover	*ls_recover_args;
 	struct rw_semaphore	ls_in_recovery;	/* block local requests */
+	struct rw_semaphore	ls_recv_active;	/* block dlm_recv */
 	struct list_head	ls_requestqueue;/* queue remote requests */
 	struct mutex		ls_requestqueue_mutex;
 	char			*ls_recover_buf;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 031229f..3915b8e 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	dlm_put_lkb(lkb);
 }
 
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
 {
-	struct dlm_message *ms = (struct dlm_message *) hd;
-	struct dlm_ls *ls;
-	int error = 0;
-
-	if (!recovery)
-		dlm_message_in(ms);
-
-	ls = dlm_find_lockspace_global(hd->h_lockspace);
-	if (!ls) {
-		log_print("drop message %d from %d for unknown lockspace %d",
-			  ms->m_type, nodeid, hd->h_lockspace);
-		return -EINVAL;
-	}
-
-	/* recovery may have just ended leaving a bunch of backed-up requests
-	   in the requestqueue; wait while dlm_recoverd clears them */
-
-	if (!recovery)
-		dlm_wait_requestqueue(ls);
-
-	/* recovery may have just started while there were a bunch of
-	   in-flight requests -- save them in requestqueue to be processed
-	   after recovery.  we can't let dlm_recvd block on the recovery
-	   lock.  if dlm_recoverd is calling this function to clear the
-	   requestqueue, it needs to be interrupted (-EINTR) if another
-	   recovery operation is starting. */
-
-	while (1) {
-		if (dlm_locking_stopped(ls)) {
-			if (recovery) {
-				error = -EINTR;
-				goto out;
-			}
-			error = dlm_add_requestqueue(ls, nodeid, hd);
-			if (error == -EAGAIN)
-				continue;
-			else {
-				error = -EINTR;
-				goto out;
-			}
-		}
-
-		if (dlm_lock_recovery_try(ls))
-			break;
-		schedule();
-	}
-
 	switch (ms->m_type) {
 
 	/* messages sent to a master node */
@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
 		log_error(ls, "unknown message type %d", ms->m_type);
 	}
 
-	dlm_unlock_recovery(ls);
- out:
-	dlm_put_lockspace(ls);
 	dlm_astd_wake();
-	return error;
 }
 
+/* If the lockspace is in recovery mode (locking stopped), then normal
+   messages are saved on the requestqueue for processing after recovery is
+   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
+   messages off the requestqueue before we process new ones. This occurs right
+   after recovery completes when we transition from saving all messages on
+   requestqueue, to processing all the saved messages, to processing new
+   messages as they arrive. */
 
-/*
- * Recovery related
- */
+static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+				int nodeid)
+{
+	if (dlm_locking_stopped(ls)) {
+		dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
+	} else {
+		dlm_wait_requestqueue(ls);
+		_receive_message(ls, ms);
+	}
+}
+
+/* This is called by dlm_recoverd to process messages that were saved on
+   the requestqueue. */
+
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+{
+	_receive_message(ls, ms);
+}
+
+/* This is called by the midcomms layer when something is received for
+   the lockspace.  It could be either a MSG (normal message sent as part of
+   standard locking activity) or an RCOM (recovery message sent as part of
+   lockspace recovery). */
+
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
+{
+	struct dlm_message *ms = (struct dlm_message *) hd;
+	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+	struct dlm_ls *ls;
+	int type = 0;
+
+	switch (hd->h_cmd) {
+	case DLM_MSG:
+		dlm_message_in(ms);
+		type = ms->m_type;
+		break;
+	case DLM_RCOM:
+		dlm_rcom_in(rc);
+		type = rc->rc_type;
+		break;
+	default:
+		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
+		return;
+	}
+
+	if (hd->h_nodeid != nodeid) {
+		log_print("invalid h_nodeid %d from %d lockspace %x",
+			  hd->h_nodeid, nodeid, hd->h_lockspace);
+		return;
+	}
+
+	ls = dlm_find_lockspace_global(hd->h_lockspace);
+	if (!ls) {
+		log_print("invalid h_lockspace %x from %d cmd %d type %d",
+			  hd->h_lockspace, nodeid, hd->h_cmd, type);
+
+		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
+			dlm_send_ls_not_ready(nodeid, rc);
+		return;
+	}
+
+	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
+	   be inactive (in this ls) before transitioning to recovery mode */
+
+	down_read(&ls->ls_recv_active);
+	if (hd->h_cmd == DLM_MSG)
+		dlm_receive_message(ls, ms, nodeid);
+	else
+		dlm_receive_rcom(ls, rc, nodeid);
+	up_read(&ls->ls_recv_active);
+
+	dlm_put_lockspace(ls);
+}
 
 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 1720313..ada0468 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -16,7 +16,8 @@
 void dlm_print_rsb(struct dlm_rsb *r);
 void dlm_dump_rsb(struct dlm_rsb *r);
 void dlm_print_lkb(struct dlm_lkb *lkb);
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
 int dlm_modes_compat(int mode1, int mode2);
 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
 	unsigned int flags, struct dlm_rsb **r_ret);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 1dc7210..628eaa6 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
 	ls->ls_recover_seq = 0;
 	ls->ls_recover_args = NULL;
 	init_rwsem(&ls->ls_in_recovery);
+	init_rwsem(&ls->ls_recv_active);
 	INIT_LIST_HEAD(&ls->ls_requestqueue);
 	mutex_init(&ls->ls_requestqueue_mutex);
 	mutex_init(&ls->ls_clear_proc_locks);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index d099775..e9cdcab 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -18,10 +18,6 @@
 #include "rcom.h"
 #include "config.h"
 
-/*
- * Following called by dlm_recoverd thread
- */
-
 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 {
 	struct dlm_member *memb = NULL;
@@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 	return error;
 }
 
-/*
- * Following called from lockspace.c
- */
+/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
+   dlm_ls_start() is called on any of them to start the new recovery. */
 
 int dlm_ls_stop(struct dlm_ls *ls)
 {
 	int new;
 
 	/*
-	 * A stop cancels any recovery that's in progress (see RECOVERY_STOP,
-	 * dlm_recovery_stopped()) and prevents any new locks from being
-	 * processed (see RUNNING, dlm_locking_stopped()).
+	 * Prevent dlm_recv from being in the middle of something when we do
+	 * the stop.  This includes ensuring dlm_recv isn't processing a
+	 * recovery message (rcom), while dlm_recoverd is aborting and
+	 * resetting things from an in-progress recovery.  i.e. we want
+	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
+	 * processing an rcom at the same time.  Stopping dlm_recv also makes
+	 * it easy for dlm_receive_message() to check locking stopped and add a
+	 * message to the requestqueue without races.
+	 */
+
+	down_write(&ls->ls_recv_active);
+
+	/*
+	 * Abort any recovery that's in progress (see RECOVERY_STOP,
+	 * dlm_recovery_stopped()) and tell any other threads running in the
+	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 	 */
 
 	spin_lock(&ls->ls_recover_lock);
@@ -271,8 +279,14 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	spin_unlock(&ls->ls_recover_lock);
 
 	/*
+	 * Let dlm_recv run again, now any normal messages will be saved on the
+	 * requestqueue for later.
+	 */
+
+	up_write(&ls->ls_recv_active);
+
+	/*
 	 * This in_recovery lock does two things:
-	 *
 	 * 1) Keeps this function from returning until all threads are out
 	 *    of locking routines and locking is truely stopped.
 	 * 2) Keeps any new requests from being processed until it's unlocked
@@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls)
 
 	/*
 	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
-	 * running) has noticed the clearing of RUNNING above and quit
-	 * processing the previous recovery.  This will be true for all nodes
-	 * before any nodes start the new recovery.
+	 * running) has noticed RECOVERY_STOP above and quit processing the
+	 * previous recovery.
 	 */
 
 	dlm_recoverd_suspend(ls);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index a5126e0..f8c69dd 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -27,7 +27,6 @@
 #include "dlm_internal.h"
 #include "lowcomms.h"
 #include "config.h"
-#include "rcom.h"
 #include "lock.h"
 #include "midcomms.h"
 
@@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
 		offset &= (limit - 1);
 		len -= msglen;
 
-		switch (msg->h_cmd) {
-		case DLM_MSG:
-			dlm_receive_message(msg, nodeid, 0);
-			break;
-
-		case DLM_RCOM:
-			dlm_receive_rcom(msg, nodeid);
-			break;
-
-		default:
-			log_print("unknown msg type %x from %u: %u %u %u %u",
-				  msg->h_cmd, nodeid, msglen, len, offset, ret);
-		}
+		dlm_receive_buffer(msg, nodeid);
 	}
 
 	if (msg != (struct dlm_header *) __tmp)
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 188b91c..ae2fd97 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -386,7 +386,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 	dlm_recover_process_copy(ls, rc_in);
 }
 
-static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
+/* If the lockspace doesn't exist then still send a status message
+   back; it's possible that it just doesn't have its global_id yet. */
+
+int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 {
 	struct dlm_rcom *rc;
 	struct rcom_config *rf;
@@ -446,28 +449,11 @@ static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 	return rv;
 }
 
-/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
+/* Called by dlm_recv; corresponds to dlm_receive_message() but special
    recovery-only comms are sent through here. */
 
-void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
-	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
-	struct dlm_ls *ls;
-
-	dlm_rcom_in(rc);
-
-	/* If the lockspace doesn't exist then still send a status message
-	   back; it's possible that it just doesn't have its global_id yet. */
-
-	ls = dlm_find_lockspace_global(hd->h_lockspace);
-	if (!ls) {
-		log_print("lockspace %x from %d type %x not found",
-			  hd->h_lockspace, nodeid, rc->rc_type);
-		if (rc->rc_type == DLM_RCOM_STATUS)
-			send_ls_not_ready(nodeid, rc);
-		return;
-	}
-
 	if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
 		log_debug(ls, "ignoring recovery message %x from %d",
 			  rc->rc_type, nodeid);
@@ -477,12 +463,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
 	if (is_old_reply(ls, rc))
 		goto out;
 
-	if (nodeid != rc->rc_header.h_nodeid) {
-		log_error(ls, "bad rcom nodeid %d from %d",
-			  rc->rc_header.h_nodeid, nodeid);
-		goto out;
-	}
-
 	switch (rc->rc_type) {
 	case DLM_RCOM_STATUS:
 		receive_rcom_status(ls, rc);
@@ -520,6 +500,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
 		DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
 	}
  out:
-	dlm_put_lockspace(ls);
+	return;
 }
 
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index d798432..b09abd2 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -18,7 +18,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
-void dlm_receive_rcom(struct dlm_header *hd, int nodeid);
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
+int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
 
 #endif
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 6657599..4b89e20 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -24,19 +24,28 @@
 
 
 /* If the start for which we're re-enabling locking (seq) has been superseded
-   by a newer stop (ls_recover_seq), we need to leave locking disabled. */
+   by a newer stop (ls_recover_seq), we need to leave locking disabled.
+
+   We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
+   locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
+   enables locking and clears the requestqueue between a and b. */
 
 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 {
 	int error = -EINTR;
 
+	down_write(&ls->ls_recv_active);
+
 	spin_lock(&ls->ls_recover_lock);
 	if (ls->ls_recover_seq == seq) {
 		set_bit(LSFL_RUNNING, &ls->ls_flags);
+		/* unblocks processes waiting to enter the dlm */
 		up_write(&ls->ls_in_recovery);
 		error = 0;
 	}
 	spin_unlock(&ls->ls_recover_lock);
+
+	up_write(&ls->ls_recv_active);
 	return error;
 }
 
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 65008d7..0de04f1 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -20,7 +20,7 @@
 struct rq_entry {
 	struct list_head list;
 	int nodeid;
-	char request[1];
+	char request[0];
 };
 
 /*
@@ -30,42 +30,39 @@ struct rq_entry {
  * lockspace is enabled on some while still suspended on others.
  */
 
-int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
 {
 	struct rq_entry *e;
 	int length = hd->h_length;
-	int rv = 0;
 
 	e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
 	if (!e) {
-		log_print("dlm_add_requestqueue: out of memory\n");
-		return 0;
+		log_print("dlm_add_requestqueue: out of memory len %d", length);
+		return;
 	}
 
 	e->nodeid = nodeid;
 	memcpy(e->request, hd, length);
 
-	/* We need to check dlm_locking_stopped() after taking the mutex to
-	   avoid a race where dlm_recoverd enables locking and runs
-	   process_requestqueue between our earlier dlm_locking_stopped check
-	   and this addition to the requestqueue. */
-
 	mutex_lock(&ls->ls_requestqueue_mutex);
-	if (dlm_locking_stopped(ls))
-		list_add_tail(&e->list, &ls->ls_requestqueue);
-	else {
-		log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid);
-		kfree(e);
-		rv = -EAGAIN;
-	}
+	list_add_tail(&e->list, &ls->ls_requestqueue);
 	mutex_unlock(&ls->ls_requestqueue_mutex);
-	return rv;
 }
 
+/*
+ * Called by dlm_recoverd to process normal messages saved while recovery was
+ * happening.  Normal locking has been enabled before this is called.  dlm_recv
+ * upon receiving a message, will wait for all saved messages to be drained
+ * here before processing the message it got.  If a new dlm_ls_stop() arrives
+ * while we're processing these saved messages, it may block trying to suspend
+ * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue.  In that
+ * case, we don't abort since locking_stopped is still 0.  If dlm_recv is not
+ * waiting for us, then this processing may be aborted due to locking_stopped.
+ */
+
 int dlm_process_requestqueue(struct dlm_ls *ls)
 {
 	struct rq_entry *e;
-	struct dlm_header *hd;
 	int error = 0;
 
 	mutex_lock(&ls->ls_requestqueue_mutex);
@@ -79,14 +76,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 		e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
 		mutex_unlock(&ls->ls_requestqueue_mutex);
 
-		hd = (struct dlm_header *) e->request;
-		error = dlm_receive_message(hd, e->nodeid, 1);
-
-		if (error == -EINTR) {
-			/* entry is left on requestqueue */
-			log_debug(ls, "process_requestqueue abort eintr");
-			break;
-		}
+		dlm_receive_message_saved(ls, (struct dlm_message *)e->request);
 
 		mutex_lock(&ls->ls_requestqueue_mutex);
 		list_del(&e->list);
@@ -106,10 +96,12 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
 
 /*
  * After recovery is done, locking is resumed and dlm_recoverd takes all the
- * saved requests and processes them as they would have been by dlm_recvd.  At
- * the same time, dlm_recvd will start receiving new requests from remote
- * nodes.  We want to delay dlm_recvd processing new requests until
- * dlm_recoverd has finished processing the old saved requests.
+ * saved requests and processes them as they would have been by dlm_recv.  At
+ * the same time, dlm_recv will start receiving new requests from remote nodes.
+ * We want to delay dlm_recv processing new requests until dlm_recoverd has
+ * finished processing the old saved requests.  We don't check for locking
+ * stopped here because dlm_ls_stop won't stop locking until it's suspended us
+ * (dlm_recv).
  */
 
 void dlm_wait_requestqueue(struct dlm_ls *ls)
@@ -118,8 +110,6 @@ void dlm_wait_requestqueue(struct dlm_ls *ls)
 		mutex_lock(&ls->ls_requestqueue_mutex);
 		if (list_empty(&ls->ls_requestqueue))
 			break;
-		if (dlm_locking_stopped(ls))
-			break;
 		mutex_unlock(&ls->ls_requestqueue_mutex);
 		schedule();
 	}
diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h
index 6a53ea0..aba34fc 100644
--- a/fs/dlm/requestqueue.h
+++ b/fs/dlm/requestqueue.h
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -13,7 +13,7 @@
 #ifndef __REQUESTQUEUE_DOT_H__
 #define __REQUESTQUEUE_DOT_H__
 
-int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
 int dlm_process_requestqueue(struct dlm_ls *ls);
 void dlm_wait_requestqueue(struct dlm_ls *ls);
 void dlm_purge_requestqueue(struct dlm_ls *ls);
-- 
1.5.1.2


  reply	other threads:[~2007-10-04  8:49 UTC|newest]

Thread overview: 106+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2007-10-04  8:48 [Cluster-devel] [GFS2/DLM] Pre-pull patch posting swhiteho
2007-10-04  8:48 ` swhiteho
2007-10-04  8:48 ` [Cluster-devel] [PATCH 01/51] [GFS2] Fix two races relating to glock callbacks swhiteho
2007-10-04  8:48   ` swhiteho
2007-10-04  8:48   ` [Cluster-devel] [PATCH 02/51] [GFS2] Fix calculation of demote state swhiteho
2007-10-04  8:48     ` swhiteho
2007-10-04  8:48     ` [Cluster-devel] [PATCH 03/51] [GFS2] Clean up duplicate includes in fs/gfs2/ swhiteho
2007-10-04  8:48       ` swhiteho
2007-10-04  8:48       ` [Cluster-devel] [PATCH 04/51] [GFS2] GFS2 not checking pointer on create when running under nfsd swhiteho
2007-10-04  8:48         ` swhiteho
2007-10-04  8:48         ` [Cluster-devel] [PATCH 05/51] [GFS2] Fix an oops in glock dumping swhiteho
2007-10-04  8:48           ` swhiteho
2007-10-04  8:48           ` [Cluster-devel] [PATCH 06/51] [GFS2] Move some code inside the log lock swhiteho
2007-10-04  8:48             ` swhiteho
2007-10-04  8:49             ` [Cluster-devel] [PATCH 07/51] [GFS2] Revert part of earlier log.c changes swhiteho
2007-10-04  8:49               ` swhiteho
2007-10-04  8:49               ` [Cluster-devel] [PATCH 08/51] [GFS2] Prevent infinite loop in try_rgrp_unlink() swhiteho
2007-10-04  8:49                 ` swhiteho
2007-10-04  8:49                 ` [Cluster-devel] [PATCH 09/51] [GFS2] use an temp variable to reduce a spin_unlock swhiteho
2007-10-04  8:49                   ` swhiteho
2007-10-04  8:49                   ` [Cluster-devel] [PATCH 10/51] [GFS2] Detach buf data during in-place writeback swhiteho
2007-10-04  8:49                     ` swhiteho
2007-10-04  8:49                     ` [Cluster-devel] [PATCH 11/51] [GFS2] mark struct *_operations const swhiteho
2007-10-04  8:49                       ` swhiteho
2007-10-04  8:49                       ` [Cluster-devel] [PATCH 12/51] [GFS2] use the declaration of gfs2_dops in the header file instead swhiteho
2007-10-04  8:49                         ` swhiteho
2007-10-04  8:49                         ` [Cluster-devel] [PATCH 13/51] [GFS2] Reduce number of gfs2_scand processes to one swhiteho
2007-10-04  8:49                           ` swhiteho
2007-10-04  8:49                           ` [Cluster-devel] [PATCH 14/51] [GFS2] invalid metadata block - REVISED swhiteho
2007-10-04  8:49                             ` swhiteho
2007-10-04  8:49                             ` [Cluster-devel] [PATCH 15/51] [GFS2] Ensure journal file cache is flushed after recovery swhiteho
2007-10-04  8:49                               ` swhiteho
2007-10-04  8:49                               ` [Cluster-devel] [PATCH 16/51] [GFS2] use list_for_each_entry instead swhiteho
2007-10-04  8:49                                 ` swhiteho
2007-10-04  8:49                                 ` [Cluster-devel] [PATCH 17/51] [GFS2] unneeded typecast swhiteho
2007-10-04  8:49                                   ` swhiteho
2007-10-04  8:49                                   ` [Cluster-devel] [PATCH 18/51] [GFS2] better code for translating characters swhiteho
2007-10-04  8:49                                     ` swhiteho
2007-10-04  8:49                                     ` [Cluster-devel] [PATCH 19/51] [GFS2] Force unstuff of hidden quota inode swhiteho
2007-10-04  8:49                                       ` swhiteho
2007-10-04  8:49                                       ` [Cluster-devel] [PATCH 20/51] [GFS2] fixed a NULL pointer assignment BUG swhiteho
2007-10-04  8:49                                         ` swhiteho
2007-10-04  8:49                                         ` [Cluster-devel] [PATCH 21/51] [GFS2] Fix quota do_list operation hang swhiteho
2007-10-04  8:49                                           ` swhiteho
2007-10-04  8:49                                           ` [Cluster-devel] [PATCH 22/51] [GFS2] Clean up invalidatepage/releasepage swhiteho
2007-10-04  8:49                                             ` swhiteho
2007-10-04  8:49                                             ` [Cluster-devel] [PATCH 23/51] [GFS2] Add a missing gfs2_trans_add_bh() swhiteho
2007-10-04  8:49                                               ` swhiteho
2007-10-04  8:49                                               ` [Cluster-devel] [PATCH 24/51] [GFS2] Add NULL entry to token table swhiteho
2007-10-04  8:49                                                 ` swhiteho
2007-10-04  8:49                                                 ` [Cluster-devel] [PATCH 25/51] [GFS2] Reduce truncate IO traffic swhiteho
2007-10-04  8:49                                                   ` swhiteho
2007-10-04  8:49                                                   ` [Cluster-devel] [PATCH 26/51] [DLM] Fix lowcomms socket closing swhiteho
2007-10-04  8:49                                                     ` swhiteho
2007-10-04  8:49                                                     ` [Cluster-devel] [PATCH 27/51] [GFS2] Wendy's dump lockname in hex & fix glock dump swhiteho
2007-10-04  8:49                                                       ` swhiteho
2007-10-04  8:49                                                       ` [Cluster-devel] [PATCH 28/51] [GFS2] Patch to protect sd_log_num_jdata swhiteho
2007-10-04  8:49                                                         ` swhiteho
2007-10-04  8:49                                                         ` [Cluster-devel] [PATCH 29/51] [GFS2] panic after can't parse mount arguments swhiteho
2007-10-04  8:49                                                           ` swhiteho
2007-10-04  8:49                                                           ` [Cluster-devel] [PATCH 30/51] [GFS2] delay glock demote for a minimum hold time swhiteho
2007-10-04  8:49                                                             ` swhiteho
2007-10-04  8:49                                                             ` [Cluster-devel] [PATCH 31/51] [GFS2] fix inode meta data corruption swhiteho
2007-10-04  8:49                                                               ` swhiteho
2007-10-04  8:49                                                               ` [Cluster-devel] [PATCH 32/51] [GFS2] Correct lock ordering in unlink swhiteho
2007-10-04  8:49                                                                 ` swhiteho
2007-10-04  8:49                                                                 ` [Cluster-devel] [PATCH 33/51] [GFS2] Introduce gfs2_remove_from_ail swhiteho
2007-10-04  8:49                                                                   ` swhiteho
2007-10-04  8:49                                                                   ` [Cluster-devel] [PATCH 34/51] [GFS2] Don't mark jdata dirty in gfs2_unstuffer_page() swhiteho
2007-10-04  8:49                                                                     ` swhiteho
2007-10-04  8:49                                                                     ` [Cluster-devel] [PATCH 35/51] [GFS2] Move pin/unpin into lops.c, clean up locking swhiteho
2007-10-04  8:49                                                                       ` swhiteho
2007-10-04  8:49                                                                       ` [Cluster-devel] [PATCH 36/51] [GFS2] Clean up ordered write code swhiteho
2007-10-04  8:49                                                                         ` swhiteho
2007-10-04  8:49                                                                         ` [Cluster-devel] [PATCH 37/51] [GFS2] Fix ordering of dirty/journal for ordered buffer unstuffing swhiteho
2007-10-04  8:49                                                                           ` swhiteho
2007-10-04  8:49                                                                           ` [Cluster-devel] [PATCH 38/51] [GFS2] Replace revoke structure with bufdata structure swhiteho
2007-10-04  8:49                                                                             ` swhiteho
2007-10-04  8:49                                                                             ` [Cluster-devel] [PATCH 39/51] [GFS2] Use slab operations for all gfs2_bufdata allocations swhiteho
2007-10-04  8:49                                                                               ` swhiteho
2007-10-04  8:49                                                                               ` [Cluster-devel] [PATCH 40/51] [GFS2] Clean up gfs2_trans_add_revoke() swhiteho
2007-10-04  8:49                                                                                 ` swhiteho
2007-10-04  8:49                                                                                 ` [Cluster-devel] [PATCH 41/51] [GFS2] flocks from same process trip kernel BUG at fs/gfs2/glock.c:1118! swhiteho
2007-10-04  8:49                                                                                   ` swhiteho
2007-10-04  8:49                                                                                   ` [Cluster-devel] [PATCH 42/51] [GFS2] Move inode deletion out of blocking_cb swhiteho
2007-10-04  8:49                                                                                     ` swhiteho
2007-10-04  8:49                                                                                     ` [Cluster-devel] [PATCH 43/51] [DLM] Make dlm_sendd cond_resched more swhiteho
2007-10-04  8:49                                                                                       ` swhiteho
2007-10-04  8:49                                                                                       ` [Cluster-devel] [PATCH 44/51] [GFS2] GFS2: chmod hung - fix race in thread creation swhiteho
2007-10-04  8:49                                                                                         ` swhiteho
2007-10-04  8:49                                                                                         ` [Cluster-devel] [PATCH 45/51] [GFS2] Clean up journaled data writing swhiteho
2007-10-04  8:49                                                                                           ` swhiteho
2007-10-04  8:49                                                                                           ` [Cluster-devel] [PATCH 46/51] [GFS2] Data corruption fix swhiteho
2007-10-04  8:49                                                                                             ` swhiteho
2007-10-04  8:49                                                                                             ` [Cluster-devel] [PATCH 47/51] [GFS2] Alternate gfs2_iget to avoid looking up inodes being freed swhiteho
2007-10-04  8:49                                                                                               ` swhiteho
2007-10-04  8:49                                                                                               ` [Cluster-devel] [PATCH 48/51] [GFS2] Don't try to remove buffers that don't exist swhiteho
2007-10-04  8:49                                                                                                 ` swhiteho
2007-10-04  8:49                                                                                                 ` [Cluster-devel] [PATCH 49/51] [GFS2] Get superblock a different way swhiteho
2007-10-04  8:49                                                                                                   ` swhiteho
2007-10-04  8:49                                                                                                   ` [Cluster-devel] [PATCH 50/51] [DLM] don't overwrite castparam if it's NULL swhiteho
2007-10-04  8:49                                                                                                     ` swhiteho
2007-10-04  8:49                                                                                                     ` swhiteho [this message]
2007-10-04  8:49                                                                                                       ` [PATCH 51/51] [DLM] block dlm_recv in recovery transition swhiteho
2007-10-12  7:47 ` [Cluster-devel] [GFS2/DLM] Pull request Steven Whitehouse
2007-10-12  7:47   ` Steven Whitehouse

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1191487882346-git-send-email-swhiteho@redhat.com \
    --to=swhiteho@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.