All of lore.kernel.org
 help / color / mirror / Atom feed
From: Steven Whitehouse <swhiteho@redhat.com>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] [DLM] overlapping cancel and unlock [13/34]
Date: Tue, 01 May 2007 11:09:12 +0100	[thread overview]
Message-ID: <1178014152.5462.153.camel@quoit.chygwyn.com> (raw)
In-Reply-To: <1178013376.5462.127.camel@quoit.chygwyn.com>

From ef0c2bb05f40f9a0cd2deae63e199bfa62faa7fa Mon Sep 17 00:00:00 2001
From: David Teigland <teigland@redhat.com>
Date: Wed, 28 Mar 2007 09:56:46 -0500
Subject: [PATCH] [DLM] overlapping cancel and unlock

Full cancel and force-unlock support.  In the past, cancel and
force-unlock
wouldn't work if there was another operation in progress on the lock.
Now,
both cancel and unlock-force can overlap an operation on a lock, meaning
there
may be 2 or 3 operations in progress on a lock in parallel.  This
support is
important not only because cancel and force-unlock are explicit
operations
that an app can use, but both are used implicitly when a process exits
while
holding locks.

Summary of changes:

- add-to and remove-from waiters functions were rewritten to handle
situations
  with more than one remote operation outstanding on a lock

- validate_unlock_args detects when an overlapping cancel/unlock-force
  can be sent and when it needs to be delayed until a request/lookup
  reply is received

- processing request/lookup replies detects when cancel/unlock-force
  occured during the op, and carries out the delayed cancel/unlock-force

- manipulation of the "waiters" (remote operation) state of a lock moved
under
  the standard rsb mutex that protects all the other lock state

- the two recovery routines related to locks on the waiters list changed
  according to the way lkb's are now locked before accessing waiters
state

- waiters recovery detects when lkb's being recovered have overlapping
  cancel/unlock-force, and may not recover such locks

- revert_lock (cancel) returns a value to distinguish cases where it did
  nothing vs cases where it actually did a cancel; the cancel completion
ast
  should only be done when cancel did something

- orphaned locks put on new list so they can be found later for purging

- cancel must be called on a lock when making it an orphan

- flag user locks (ENDOFLIFE) at the end of their useful life (to the
  application) so we can return an error for any further
cancel/unlock-force

- we weren't setting COMP/BAST ast flags if one was already set, so we'd
lose
  either a completion or blocking ast

- clear an unread bast on a lock that's become unlocked

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 61d9320..178931c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -2,7 +2,7 @@

*******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights
reserved.
-**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to
use,
 **  modify, copy, or redistribute it subject to the terms and
conditions
@@ -210,6 +210,9 @@ struct dlm_args {
 #define DLM_IFL_MSTCPY		0x00010000
 #define DLM_IFL_RESEND		0x00020000
 #define DLM_IFL_DEAD		0x00040000
+#define DLM_IFL_OVERLAP_UNLOCK  0x00080000
+#define DLM_IFL_OVERLAP_CANCEL  0x00100000
+#define DLM_IFL_ENDOFLIFE	0x00200000
 #define DLM_IFL_USER		0x00000001
 #define DLM_IFL_ORPHAN		0x00000002
 
@@ -230,8 +233,8 @@ struct dlm_lkb {
 	int8_t			lkb_grmode;	/* granted lock mode */
 	int8_t			lkb_bastmode;	/* requested mode */
 	int8_t			lkb_highbast;	/* highest mode bast sent for */
-
 	int8_t			lkb_wait_type;	/* type of reply waiting for */
+	int8_t			lkb_wait_count;
 	int8_t			lkb_ast_type;	/* type of ast queued for */
 
 	struct list_head	lkb_idtbl_list;	/* lockspace lkbtbl */
@@ -440,6 +443,9 @@ struct dlm_ls {
 	struct mutex		ls_waiters_mutex;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
+	struct mutex		ls_orphans_mutex;
+	struct list_head	ls_orphans;
+
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e725005..b865a46 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1,7 +1,7 @@
 /******************************************************************************

*******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to
use,
 **  modify, copy, or redistribute it subject to the terms and
conditions
@@ -254,6 +254,22 @@ static inline int down_conversion(struct dlm_lkb
*lkb)
 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 }
 
+static inline int is_overlap_unlock(struct dlm_lkb *lkb)
+{
+	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
+}
+
+static inline int is_overlap_cancel(struct dlm_lkb *lkb)
+{
+	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
+}
+
+static inline int is_overlap(struct dlm_lkb *lkb)
+{
+	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
+				  DLM_IFL_OVERLAP_CANCEL));
+}
+
 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 {
 	if (is_master_copy(lkb))
@@ -267,6 +283,12 @@ static void queue_cast(struct dlm_rsb *r, struct
dlm_lkb *lkb, int rv)
 	dlm_add_ast(lkb, AST_COMP);
 }
 
+static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb
*lkb)
+{
+	queue_cast(r, lkb,
+		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
+}
+
 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int
rqmode)
 {
 	if (is_master_copy(lkb))
@@ -547,6 +569,7 @@ static int create_lkb(struct dlm_ls *ls, struct
dlm_lkb **lkb_ret)
 	lkb->lkb_grmode = DLM_LOCK_IV;
 	kref_init(&lkb->lkb_ref);
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
+	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 
 	get_random_bytes(&bucket, sizeof(bucket));
 	bucket &= (ls->ls_lkbtbl_size - 1);
@@ -735,23 +758,75 @@ static void move_lkb(struct dlm_rsb *r, struct
dlm_lkb *lkb, int sts)
 	unhold_lkb(lkb);
 }
 
+static int msg_reply_type(int mstype)
+{
+	switch (mstype) {
+	case DLM_MSG_REQUEST:
+		return DLM_MSG_REQUEST_REPLY;
+	case DLM_MSG_CONVERT:
+		return DLM_MSG_CONVERT_REPLY;
+	case DLM_MSG_UNLOCK:
+		return DLM_MSG_UNLOCK_REPLY;
+	case DLM_MSG_CANCEL:
+		return DLM_MSG_CANCEL_REPLY;
+	case DLM_MSG_LOOKUP:
+		return DLM_MSG_LOOKUP_REPLY;
+	}
+	return -1;
+}
+
 /* add/remove lkb from global waiters list of lkb's waiting for
    a reply from a remote node */
 
-static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
+static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
 {
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int error = 0;
 
 	mutex_lock(&ls->ls_waiters_mutex);
-	if (lkb->lkb_wait_type) {
-		log_print("add_to_waiters error %d", lkb->lkb_wait_type);
+
+	if (is_overlap_unlock(lkb) ||
+	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
+		error = -EINVAL;
+		goto out;
+	}
+
+	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
+		switch (mstype) {
+		case DLM_MSG_UNLOCK:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
+			break;
+		case DLM_MSG_CANCEL:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
+			break;
+		default:
+			error = -EBUSY;
+			goto out;
+		}
+		lkb->lkb_wait_count++;
+		hold_lkb(lkb);
+
+		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
+			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
+			  lkb->lkb_wait_count, lkb->lkb_flags);
 		goto out;
 	}
+
+	DLM_ASSERT(!lkb->lkb_wait_count,
+		   dlm_print_lkb(lkb);
+		   printk("wait_count %d\n", lkb->lkb_wait_count););
+
+	lkb->lkb_wait_count++;
 	lkb->lkb_wait_type = mstype;
-	kref_get(&lkb->lkb_ref);
+	hold_lkb(lkb);
 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
  out:
+	if (error)
+		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
+			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
+			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
 	mutex_unlock(&ls->ls_waiters_mutex);
+	return error;
 }
 
 /* We clear the RESEND flag because we might be taking an lkb off the
waiters
@@ -759,34 +834,85 @@ static void add_to_waiters(struct dlm_lkb *lkb,
int mstype)
    request reply on the requestqueue) between dlm_recover_waiters_pre()
which
    set RESEND and dlm_recover_waiters_post() */
 
-static int _remove_from_waiters(struct dlm_lkb *lkb)
+static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 {
-	int error = 0;
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int overlap_done = 0;
 
-	if (!lkb->lkb_wait_type) {
-		log_print("remove_from_waiters error");
-		error = -EINVAL;
-		goto out;
+	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		overlap_done = 1;
+		goto out_del;
 	}
-	lkb->lkb_wait_type = 0;
+
+	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		overlap_done = 1;
+		goto out_del;
+	}
+
+	/* N.B. type of reply may not always correspond to type of original
+	   msg due to lookup->request optimization, verify others? */
+
+	if (lkb->lkb_wait_type) {
+		lkb->lkb_wait_type = 0;
+		goto out_del;
+	}
+
+	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
+		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
+	return -1;
+
+ out_del:
+	/* the force-unlock/cancel has completed and we haven't recvd a reply
+	   to the op that was in progress prior to the unlock/cancel; we
+	   give up on any reply to the earlier op.  FIXME: not sure when/how
+	   this would happen */
+
+	if (overlap_done && lkb->lkb_wait_type) {
+		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
+			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
+		lkb->lkb_wait_count--;
+		lkb->lkb_wait_type = 0;
+	}
+
+	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
+
 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
-	list_del(&lkb->lkb_wait_reply);
+	lkb->lkb_wait_count--;
+	if (!lkb->lkb_wait_count)
+		list_del_init(&lkb->lkb_wait_reply);
 	unhold_lkb(lkb);
- out:
-	return error;
+	return 0;
 }
 
-static int remove_from_waiters(struct dlm_lkb *lkb)
+static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 {
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error;
 
 	mutex_lock(&ls->ls_waiters_mutex);
-	error = _remove_from_waiters(lkb);
+	error = _remove_from_waiters(lkb, mstype);
 	mutex_unlock(&ls->ls_waiters_mutex);
 	return error;
 }
 
+/* Handles situations where we might be processing a "fake" or "stub"
reply in
+   which we can't try to take waiters_mutex again. */
+
+static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct
dlm_message *ms)
+{
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int error;
+
+	if (ms != &ls->ls_stub_ms)
+		mutex_lock(&ls->ls_waiters_mutex);
+	error = _remove_from_waiters(lkb, ms->m_type);
+	if (ms != &ls->ls_stub_ms)
+		mutex_unlock(&ls->ls_waiters_mutex);
+	return error;
+}
+
 static void dir_remove(struct dlm_rsb *r)
 {
 	int to_nodeid;
@@ -988,8 +1114,14 @@ static void remove_lock_pc(struct dlm_rsb *r,
struct dlm_lkb *lkb)
 	_remove_lock(r, lkb);
 }
 
-static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+/* returns: 0 did nothing
+	    1 moved lock to granted
+	   -1 removed lock */
+
+static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
+	int rv = 0;
+
 	lkb->lkb_rqmode = DLM_LOCK_IV;
 
 	switch (lkb->lkb_status) {
@@ -997,6 +1129,7 @@ static void revert_lock(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 		break;
 	case DLM_LKSTS_CONVERT:
 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
+		rv = 1;
 		break;
 	case DLM_LKSTS_WAITING:
 		del_lkb(r, lkb);
@@ -1004,15 +1137,17 @@ static void revert_lock(struct dlm_rsb *r,
struct dlm_lkb *lkb)
 		/* this unhold undoes the original ref from create_lkb()
 		   so this leads to the lkb being freed */
 		unhold_lkb(lkb);
+		rv = -1;
 		break;
 	default:
 		log_print("invalid status for revert %d", lkb->lkb_status);
 	}
+	return rv;
 }
 
-static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	revert_lock(r, lkb);
+	return revert_lock(r, lkb);
 }
 
 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1499,7 +1634,7 @@ static void process_lookup_list(struct dlm_rsb *r)
 	struct dlm_lkb *lkb, *safe;
 
 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
-		list_del(&lkb->lkb_rsb_lookup);
+		list_del_init(&lkb->lkb_rsb_lookup);
 		_request_lock(r, lkb);
 		schedule();
 	}
@@ -1530,7 +1665,7 @@ static void confirm_master(struct dlm_rsb *r, int
error)
 		if (!list_empty(&r->res_lookup)) {
 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
 					 lkb_rsb_lookup);
-			list_del(&lkb->lkb_rsb_lookup);
+			list_del_init(&lkb->lkb_rsb_lookup);
 			r->res_first_lkid = lkb->lkb_id;
 			_request_lock(r, lkb);
 		} else
@@ -1614,6 +1749,9 @@ static int set_unlock_args(uint32_t flags, void
*astarg, struct dlm_args *args)
  		      DLM_LKF_FORCEUNLOCK))
 		return -EINVAL;
 
+	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
+		return -EINVAL;
+
 	args->flags = flags;
 	args->astparam = (long) astarg;
 	return 0;
@@ -1638,6 +1776,9 @@ static int validate_lock_args(struct dlm_ls *ls,
struct dlm_lkb *lkb,
 
 		if (lkb->lkb_wait_type)
 			goto out;
+
+		if (is_overlap(lkb))
+			goto out;
 	}
 
 	lkb->lkb_exflags = args->flags;
@@ -1654,35 +1795,126 @@ static int validate_lock_args(struct dlm_ls
*ls, struct dlm_lkb *lkb,
 	return rv;
 }
 
+/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
+   for success */
+
+/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
+   because there may be a lookup in progress and it's valid to do
+   cancel/unlockf on it */
+
 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args
*args)
 {
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int rv = -EINVAL;
 
-	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
+	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
+		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
+		dlm_print_lkb(lkb);
 		goto out;
+	}
 
-	if (args->flags & DLM_LKF_FORCEUNLOCK)
-		goto out_ok;
+	/* an lkb may still exist even though the lock is EOL'ed due to a
+	   cancel, unlock or failed noqueue request; an app can't use these
+	   locks; return same error as if the lkid had not been found at all
*/
 
-	if (args->flags & DLM_LKF_CANCEL &&
-	    lkb->lkb_status == DLM_LKSTS_GRANTED)
+	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
+		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
+		rv = -ENOENT;
 		goto out;
+	}
 
-	if (!(args->flags & DLM_LKF_CANCEL) &&
-	    lkb->lkb_status != DLM_LKSTS_GRANTED)
-		goto out;
+	/* an lkb may be waiting for an rsb lookup to complete where the
+	   lookup was initiated by another lock */
+
+	if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
+		if (!list_empty(&lkb->lkb_rsb_lookup)) {
+			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
+			list_del_init(&lkb->lkb_rsb_lookup);
+			queue_cast(lkb->lkb_resource, lkb,
+				   args->flags & DLM_LKF_CANCEL ?
+				   -DLM_ECANCEL : -DLM_EUNLOCK);
+			unhold_lkb(lkb); /* undoes create_lkb() */
+			rv = -EBUSY;
+			goto out;
+		}
+	}
+
+	/* cancel not allowed with another cancel/unlock in progress */
+
+	if (args->flags & DLM_LKF_CANCEL) {
+		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
+			goto out;
+
+		if (is_overlap(lkb))
+			goto out;
+
+		if (lkb->lkb_flags & DLM_IFL_RESEND) {
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
+			rv = -EBUSY;
+			goto out;
+		}
+
+		switch (lkb->lkb_wait_type) {
+		case DLM_MSG_LOOKUP:
+		case DLM_MSG_REQUEST:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
+			rv = -EBUSY;
+			goto out;
+		case DLM_MSG_UNLOCK:
+		case DLM_MSG_CANCEL:
+			goto out;
+		}
+		/* add_to_waiters() will set OVERLAP_CANCEL */
+		goto out_ok;
+	}
+
+	/* do we need to allow a force-unlock if there's a normal unlock
+	   already in progress?  in what conditions could the normal unlock
+	   fail such that we'd want to send a force-unlock to be sure? */
+
+	if (args->flags & DLM_LKF_FORCEUNLOCK) {
+		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
+			goto out;
+
+		if (is_overlap_unlock(lkb))
+			goto out;
 
+		if (lkb->lkb_flags & DLM_IFL_RESEND) {
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
+			rv = -EBUSY;
+			goto out;
+		}
+
+		switch (lkb->lkb_wait_type) {
+		case DLM_MSG_LOOKUP:
+		case DLM_MSG_REQUEST:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
+			rv = -EBUSY;
+			goto out;
+		case DLM_MSG_UNLOCK:
+			goto out;
+		}
+		/* add_to_waiters() will set OVERLAP_UNLOCK */
+		goto out_ok;
+	}
+
+	/* normal unlock not allowed if there's any op in progress */
 	rv = -EBUSY;
-	if (lkb->lkb_wait_type)
+	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
 		goto out;
 
  out_ok:
-	lkb->lkb_exflags = args->flags;
+	/* an overlapping op shouldn't blow away exflags from other op */
+	lkb->lkb_exflags |= args->flags;
 	lkb->lkb_sbflags = 0;
 	lkb->lkb_astparam = args->astparam;
-
 	rv = 0;
  out:
+	if (rv)
+		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
+			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
+			  args->flags, lkb->lkb_wait_type,
+			  lkb->lkb_resource->res_name);
 	return rv;
 }
 
@@ -1759,17 +1991,19 @@ static int do_unlock(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 	return -DLM_EUNLOCK;
 }
 
-/* FIXME: if revert_lock() finds that the lkb is granted, we should
-   skip the queue_cast(ECANCEL).  It indicates that the request/convert
-   completed (and queued a normal ast) just before the cancel; we don't
-   want to clobber the sb_result for the normal ast with ECANCEL. */
+/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
  
 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	revert_lock(r, lkb);
-	queue_cast(r, lkb, -DLM_ECANCEL);
-	grant_pending_locks(r);
-	return -DLM_ECANCEL;
+	int error;
+
+	error = revert_lock(r, lkb);
+	if (error) {
+		queue_cast(r, lkb, -DLM_ECANCEL);
+		grant_pending_locks(r);
+		return -DLM_ECANCEL;
+	}
+	return 0;
 }
 
 /*
@@ -2035,6 +2269,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 
 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
 		error = 0;
+	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL |
DLM_LKF_FORCEUNLOCK)))
+		error = 0;
  out_put:
 	dlm_put_lkb(lkb);
  out:
@@ -2176,7 +2412,9 @@ static int send_common(struct dlm_rsb *r, struct
dlm_lkb *lkb, int mstype)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	add_to_waiters(lkb, mstype);
+	error = add_to_waiters(lkb, mstype);
+	if (error)
+		return error;
 
 	to_nodeid = r->res_nodeid;
 
@@ -2192,7 +2430,7 @@ static int send_common(struct dlm_rsb *r, struct
dlm_lkb *lkb, int mstype)
 	return 0;
 
  fail:
-	remove_from_waiters(lkb);
+	remove_from_waiters(lkb, msg_reply_type(mstype));
 	return error;
 }
 
@@ -2209,7 +2447,8 @@ static int send_convert(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 
 	/* down conversions go without a reply from the master */
 	if (!error && down_conversion(lkb)) {
-		remove_from_waiters(lkb);
+		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
+		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
 		r->res_ls->ls_stub_ms.m_result = 0;
 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
@@ -2280,7 +2519,9 @@ static int send_lookup(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	add_to_waiters(lkb, DLM_MSG_LOOKUP);
+	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
+	if (error)
+		return error;
 
 	to_nodeid = dlm_dir_nodeid(r);
 
@@ -2296,7 +2537,7 @@ static int send_lookup(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 	return 0;
 
  fail:
-	remove_from_waiters(lkb);
+	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
 	return error;
 }
 
@@ -2740,7 +2981,7 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 {
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
-	int error, mstype;
+	int error, mstype, result;
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
@@ -2749,20 +2990,15 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	mstype = lkb->lkb_wait_type;
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_request_reply not on waiters");
-		goto out;
-	}
-
-	/* this is the value returned from do_request() on the master */
-	error = ms->m_result;
-
 	r = lkb->lkb_resource;
 	hold_rsb(r);
 	lock_rsb(r);
 
+	mstype = lkb->lkb_wait_type;
+	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
+	if (error)
+		goto out;
+
 	/* Optimization: the dir node was also the master, so it took our
 	   lookup as a request and sent request reply instead of lookup reply
*/
 	if (mstype == DLM_MSG_LOOKUP) {
@@ -2770,14 +3006,15 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		lkb->lkb_nodeid = r->res_nodeid;
 	}
 
-	switch (error) {
+	/* this is the value returned from do_request() on the master */
+	result = ms->m_result;
+
+	switch (result) {
 	case -EAGAIN:
-		/* request would block (be queued) on remote master;
-		   the unhold undoes the original ref from create_lkb()
-		   so it leads to the lkb being freed */
+		/* request would block (be queued) on remote master */
 		queue_cast(r, lkb, -EAGAIN);
 		confirm_master(r, -EAGAIN);
-		unhold_lkb(lkb);
+		unhold_lkb(lkb); /* undoes create_lkb() */
 		break;
 
 	case -EINPROGRESS:
@@ -2785,41 +3022,62 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		/* request was queued or granted on remote master */
 		receive_flags_reply(lkb, ms);
 		lkb->lkb_remid = ms->m_lkid;
-		if (error)
+		if (result)
 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
 		else {
 			grant_lock_pc(r, lkb, ms);
 			queue_cast(r, lkb, 0);
 		}
-		confirm_master(r, error);
+		confirm_master(r, result);
 		break;
 
 	case -EBADR:
 	case -ENOTBLK:
 		/* find_rsb failed to find rsb or rsb wasn't master */
+		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
+			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
 		r->res_nodeid = -1;
 		lkb->lkb_nodeid = -1;
-		_request_lock(r, lkb);
+
+		if (is_overlap(lkb)) {
+			/* we'll ignore error in cancel/unlock reply */
+			queue_cast_overlap(r, lkb);
+			unhold_lkb(lkb); /* undoes create_lkb() */
+		} else
+			_request_lock(r, lkb);
 		break;
 
 	default:
-		log_error(ls, "receive_request_reply error %d", error);
+		log_error(ls, "receive_request_reply %x error %d",
+			  lkb->lkb_id, result);
 	}
 
+	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS))
{
+		log_debug(ls, "receive_request_reply %x result %d unlock",
+			  lkb->lkb_id, result);
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		send_unlock(r, lkb);
+	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
+		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		send_cancel(r, lkb);
+	} else {
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+	}
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
- out:
 	dlm_put_lkb(lkb);
 }
 
 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb
*lkb,
 				    struct dlm_message *ms)
 {
-	int error = ms->m_result;
-
 	/* this is the value returned from do_convert() on the master */
-
-	switch (error) {
+	switch (ms->m_result) {
 	case -EAGAIN:
 		/* convert would block (be queued) on remote master */
 		queue_cast(r, lkb, -EAGAIN);
@@ -2839,19 +3097,26 @@ static void __receive_convert_reply(struct
dlm_rsb *r, struct dlm_lkb *lkb,
 		break;
 
 	default:
-		log_error(r->res_ls, "receive_convert_reply error %d", error);
+		log_error(r->res_ls, "receive_convert_reply %x error %d",
+			  lkb->lkb_id, ms->m_result);
 	}
 }
 
 static void _receive_convert_reply(struct dlm_lkb *lkb, struct
dlm_message *ms)
 {
 	struct dlm_rsb *r = lkb->lkb_resource;
+	int error;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
-	__receive_convert_reply(r, lkb, ms);
+	/* stub reply can happen with waiters_mutex held */
+	error = remove_from_waiters_ms(lkb, ms);
+	if (error)
+		goto out;
 
+	__receive_convert_reply(r, lkb, ms);
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 }
@@ -2868,37 +3133,38 @@ static void receive_convert_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_convert_reply not on waiters");
-		goto out;
-	}
-
 	_receive_convert_reply(lkb, ms);
- out:
 	dlm_put_lkb(lkb);
 }
 
 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct
dlm_message *ms)
 {
 	struct dlm_rsb *r = lkb->lkb_resource;
-	int error = ms->m_result;
+	int error;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
+	/* stub reply can happen with waiters_mutex held */
+	error = remove_from_waiters_ms(lkb, ms);
+	if (error)
+		goto out;
+
 	/* this is the value returned from do_unlock() on the master */
 
-	switch (error) {
+	switch (ms->m_result) {
 	case -DLM_EUNLOCK:
 		receive_flags_reply(lkb, ms);
 		remove_lock_pc(r, lkb);
 		queue_cast(r, lkb, -DLM_EUNLOCK);
 		break;
+	case -ENOENT:
+		break;
 	default:
-		log_error(r->res_ls, "receive_unlock_reply error %d", error);
+		log_error(r->res_ls, "receive_unlock_reply %x error %d",
+			  lkb->lkb_id, ms->m_result);
 	}
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 }
@@ -2915,37 +3181,39 @@ static void receive_unlock_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_unlock_reply not on waiters");
-		goto out;
-	}
-
 	_receive_unlock_reply(lkb, ms);
- out:
 	dlm_put_lkb(lkb);
 }
 
 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct
dlm_message *ms)
 {
 	struct dlm_rsb *r = lkb->lkb_resource;
-	int error = ms->m_result;
+	int error;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
+	/* stub reply can happen with waiters_mutex held */
+	error = remove_from_waiters_ms(lkb, ms);
+	if (error)
+		goto out;
+
 	/* this is the value returned from do_cancel() on the master */
 
-	switch (error) {
+	switch (ms->m_result) {
 	case -DLM_ECANCEL:
 		receive_flags_reply(lkb, ms);
 		revert_lock_pc(r, lkb);
-		queue_cast(r, lkb, -DLM_ECANCEL);
+		if (ms->m_result)
+			queue_cast(r, lkb, -DLM_ECANCEL);
+		break;
+	case 0:
 		break;
 	default:
-		log_error(r->res_ls, "receive_cancel_reply error %d", error);
+		log_error(r->res_ls, "receive_cancel_reply %x error %d",
+			  lkb->lkb_id, ms->m_result);
 	}
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 }
@@ -2962,14 +3230,7 @@ static void receive_cancel_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_cancel_reply not on waiters");
-		goto out;
-	}
-
 	_receive_cancel_reply(lkb, ms);
- out:
 	dlm_put_lkb(lkb);
 }
 
@@ -2985,20 +3246,17 @@ static void receive_lookup_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		return;
 	}
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_lookup_reply not on waiters");
-		goto out;
-	}
-
-	/* this is the value returned by dlm_dir_lookup on dir node
+	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
 	   FIXME: will a non-zero error ever be returned? */
-	error = ms->m_result;
 
 	r = lkb->lkb_resource;
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
+	if (error)
+		goto out;
+
 	ret_nodeid = ms->m_nodeid;
 	if (ret_nodeid == dlm_our_nodeid()) {
 		r->res_nodeid = 0;
@@ -3009,14 +3267,22 @@ static void receive_lookup_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		r->res_nodeid = ret_nodeid;
 	}
 
+	if (is_overlap(lkb)) {
+		log_debug(ls, "receive_lookup_reply %x unlock %x",
+			  lkb->lkb_id, lkb->lkb_flags);
+		queue_cast_overlap(r, lkb);
+		unhold_lkb(lkb); /* undoes create_lkb() */
+		goto out_list;
+	}
+
 	_request_lock(r, lkb);
 
+ out_list:
 	if (!ret_nodeid)
 		process_lookup_list(r);
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
- out:
 	dlm_put_lkb(lkb);
 }
 
@@ -3153,9 +3419,9 @@ static void recover_convert_waiter(struct dlm_ls
*ls, struct dlm_lkb *lkb)
 {
 	if (middle_conversion(lkb)) {
 		hold_lkb(lkb);
+		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
 		ls->ls_stub_ms.m_result = -EINPROGRESS;
 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-		_remove_from_waiters(lkb);
 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
 
 		/* Same special case as in receive_rcom_lock_args() */
@@ -3227,18 +3493,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 
 		case DLM_MSG_UNLOCK:
 			hold_lkb(lkb);
+			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-			_remove_from_waiters(lkb);
 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
 			break;
 
 		case DLM_MSG_CANCEL:
 			hold_lkb(lkb);
+			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-			_remove_from_waiters(lkb);
 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
 			break;
@@ -3252,37 +3518,47 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 	mutex_unlock(&ls->ls_waiters_mutex);
 }
 
-static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb
**lkb_ret)
+static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
-	int rv = 0;
+	int found = 0;
 
 	mutex_lock(&ls->ls_waiters_mutex);
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
-			rv = lkb->lkb_wait_type;
-			_remove_from_waiters(lkb);
-			lkb->lkb_flags &= ~DLM_IFL_RESEND;
+			hold_lkb(lkb);
+			found = 1;
 			break;
 		}
 	}
 	mutex_unlock(&ls->ls_waiters_mutex);
 
-	if (!rv)
+	if (!found)
 		lkb = NULL;
-	*lkb_ret = lkb;
-	return rv;
+	return lkb;
 }
 
 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be
the
    master or dir-node for r.  Processing the lkb may result in it being
placed
    back on waiters. */
 
+/* We do this after normal locking has been enabled and any saved
messages
+   (in requestqueue) have been processed.  We should be confident that
at
+   this point we won't get or process a reply to any of these waiting
+   operations.  But, new ops may be coming in on the rsbs/locks here
from
+   userspace or remotely. */
+
+/* there may have been an overlap unlock/cancel prior to recovery or
after
+   recovery.  if before, the lkb may still have a pos wait_count; if
after, the
+   overlap flag would just have been set and nothing new sent.  we can
be
+   confident here than any replies to either the initial op or overlap
ops
+   prior to recovery have been received. */
+
 int dlm_recover_waiters_post(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
-	int error = 0, mstype;
+	int error = 0, mstype, err, oc, ou;
 
 	while (1) {
 		if (dlm_locking_stopped(ls)) {
@@ -3291,48 +3567,78 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 			break;
 		}
 
-		mstype = remove_resend_waiter(ls, &lkb);
-		if (!mstype)
+		lkb = find_resend_waiter(ls);
+		if (!lkb)
 			break;
 
 		r = lkb->lkb_resource;
+		hold_rsb(r);
+		lock_rsb(r);
+
+		mstype = lkb->lkb_wait_type;
+		oc = is_overlap_cancel(lkb);
+		ou = is_overlap_unlock(lkb);
+		err = 0;
 
 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
 
-		switch (mstype) {
-
-		case DLM_MSG_LOOKUP:
-			hold_rsb(r);
-			lock_rsb(r);
-			_request_lock(r, lkb);
-			if (is_master(r))
-				confirm_master(r, 0);
-			unlock_rsb(r);
-			put_rsb(r);
-			break;
-
-		case DLM_MSG_REQUEST:
-			hold_rsb(r);
-			lock_rsb(r);
-			_request_lock(r, lkb);
-			if (is_master(r))
-				confirm_master(r, 0);
-			unlock_rsb(r);
-			put_rsb(r);
-			break;
-
-		case DLM_MSG_CONVERT:
-			hold_rsb(r);
-			lock_rsb(r);
-			_convert_lock(r, lkb);
-			unlock_rsb(r);
-			put_rsb(r);
-			break;
-
-		default:
-			log_error(ls, "recover_waiters_post type %d", mstype);
+		/* At this point we assume that we won't get a reply to any
+		   previous op or overlap op on this lock.  First, do a big
+		   remove_from_waiters() for all previous ops. */
+
+		lkb->lkb_flags &= ~DLM_IFL_RESEND;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		lkb->lkb_wait_type = 0;
+		lkb->lkb_wait_count = 0;
+		mutex_lock(&ls->ls_waiters_mutex);
+		list_del_init(&lkb->lkb_wait_reply);
+		mutex_unlock(&ls->ls_waiters_mutex);
+		unhold_lkb(lkb); /* for waiters list */
+
+		if (oc || ou) {
+			/* do an unlock or cancel instead of resending */
+			switch (mstype) {
+			case DLM_MSG_LOOKUP:
+			case DLM_MSG_REQUEST:
+				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
+							-DLM_ECANCEL);
+				unhold_lkb(lkb); /* undoes create_lkb() */
+				break;
+			case DLM_MSG_CONVERT:
+				if (oc) {
+					queue_cast(r, lkb, -DLM_ECANCEL);
+				} else {
+					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
+					_unlock_lock(r, lkb);
+				}
+				break;
+			default:
+				err = 1;
+			}
+		} else {
+			switch (mstype) {
+			case DLM_MSG_LOOKUP:
+			case DLM_MSG_REQUEST:
+				_request_lock(r, lkb);
+				if (is_master(r))
+					confirm_master(r, 0);
+				break;
+			case DLM_MSG_CONVERT:
+				_convert_lock(r, lkb);
+				break;
+			default:
+				err = 1;
+			}
 		}
+
+		if (err)
+			log_error(ls, "recover_waiters_post %x %d %x %d %d",
+			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
+		unlock_rsb(r);
+		put_rsb(r);
+		dlm_put_lkb(lkb);
 	}
 
 	return error;
@@ -3684,7 +3990,7 @@ int dlm_user_request(struct dlm_ls *ls, struct
dlm_user_args *ua,
 
 	/* add this new lkb to the per-process list of locks */
 	spin_lock(&ua->proc->locks_spin);
-	kref_get(&lkb->lkb_ref);
+	hold_lkb(lkb);
 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
 	spin_unlock(&ua->proc->locks_spin);
  out:
@@ -3774,6 +4080,9 @@ int dlm_user_unlock(struct dlm_ls *ls, struct
dlm_user_args *ua_tmp,
 
 	if (error == -DLM_EUNLOCK)
 		error = 0;
+	/* from validate_unlock_args() */
+	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
+		error = 0;
 	if (error)
 		goto out_put;
 
@@ -3786,6 +4095,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct
dlm_user_args *ua_tmp,
 	dlm_put_lkb(lkb);
  out:
 	unlock_recovery(ls);
+	kfree(ua_tmp);
 	return error;
 }
 
@@ -3815,33 +4125,37 @@ int dlm_user_cancel(struct dlm_ls *ls, struct
dlm_user_args *ua_tmp,
 
 	if (error == -DLM_ECANCEL)
 		error = 0;
-	if (error)
-		goto out_put;
-
-	/* this lkb was removed from the WAITING queue */
-	if (lkb->lkb_grmode == DLM_LOCK_IV) {
-		spin_lock(&ua->proc->locks_spin);
-		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
-		spin_unlock(&ua->proc->locks_spin);
-	}
+	/* from validate_unlock_args() */
+	if (error == -EBUSY)
+		error = 0;
  out_put:
 	dlm_put_lkb(lkb);
  out:
 	unlock_recovery(ls);
+	kfree(ua_tmp);
 	return error;
 }
 
+/* lkb's that are removed from the waiters list by revert are just left
on the
+   orphans list with the granted orphan locks, to be freed by purge */
+
 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+	struct dlm_args args;
+	int error;
 
-	if (ua->lksb.sb_lvbptr)
-		kfree(ua->lksb.sb_lvbptr);
-	kfree(ua);
-	lkb->lkb_astparam = (long)NULL;
+	hold_lkb(lkb);
+	mutex_lock(&ls->ls_orphans_mutex);
+	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
+	mutex_unlock(&ls->ls_orphans_mutex);
 
-	/* TODO: propogate to master if needed */
-	return 0;
+	set_unlock_args(0, ua, &args);
+
+	error = cancel_lock(ls, lkb, &args);
+	if (error == -DLM_ECANCEL)
+		error = 0;
+	return error;
 }
 
 /* The force flag allows the unlock to go ahead even if the lkb isn't
granted.
@@ -3853,10 +4167,6 @@ static int unlock_proc_lock(struct dlm_ls *ls,
struct dlm_lkb *lkb)
 	struct dlm_args args;
 	int error;
 
-	/* FIXME: we need to handle the case where the lkb is in limbo
-	   while the rsb is being looked up, currently we assert in
-	   _unlock_lock/is_remote because rsb nodeid is -1. */
-
 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
 
 	error = unlock_lock(ls, lkb, &args);
@@ -3865,6 +4175,31 @@ static int unlock_proc_lock(struct dlm_ls *ls,
struct dlm_lkb *lkb)
 	return error;
 }
 
+/* We have to release clear_proc_locks mutex before calling
unlock_proc_lock()
+   (which does lock_rsb) due to deadlock with receiving a message that
does
+   lock_rsb followed by dlm_user_add_ast() */
+
+static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
+				     struct dlm_user_proc *proc)
+{
+	struct dlm_lkb *lkb = NULL;
+
+	mutex_lock(&ls->ls_clear_proc_locks);
+	if (list_empty(&proc->locks))
+		goto out;
+
+	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
+	list_del_init(&lkb->lkb_ownqueue);
+
+	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
+		lkb->lkb_flags |= DLM_IFL_ORPHAN;
+	else
+		lkb->lkb_flags |= DLM_IFL_DEAD;
+ out:
+	mutex_unlock(&ls->ls_clear_proc_locks);
+	return lkb;
+}
+
 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts()
which
    1) references lkb->ua which we free here and 2) adds lkbs to
proc->asts,
    which we clear here. */
@@ -3880,18 +4215,15 @@ void dlm_clear_proc_locks(struct dlm_ls *ls,
struct dlm_user_proc *proc)
 	struct dlm_lkb *lkb, *safe;
 
 	lock_recovery(ls);
-	mutex_lock(&ls->ls_clear_proc_locks);
 
-	list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
-		list_del_init(&lkb->lkb_ownqueue);
-
-		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
-			lkb->lkb_flags |= DLM_IFL_ORPHAN;
+	while (1) {
+		lkb = del_proc_lock(ls, proc);
+		if (!lkb)
+			break;
+		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
 			orphan_proc_lock(ls, lkb);
-		} else {
-			lkb->lkb_flags |= DLM_IFL_DEAD;
+		else
 			unlock_proc_lock(ls, lkb);
-		}
 
 		/* this removes the reference for the proc->locks list
 		   added by dlm_user_request, it may result in the lkb
@@ -3900,6 +4232,8 @@ void dlm_clear_proc_locks(struct dlm_ls *ls,
struct dlm_user_proc *proc)
 		dlm_put_lkb(lkb);
 	}
 
+	mutex_lock(&ls->ls_clear_proc_locks);
+
 	/* in-progress unlocks */
 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
 		list_del_init(&lkb->lkb_ownqueue);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index f40817b..f607ca2 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -2,7 +2,7 @@

*******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights
reserved.
-**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to
use,
 **  modify, copy, or redistribute it subject to the terms and
conditions
@@ -459,6 +459,8 @@ static int new_lockspace(char *name, int namelen,
void **lockspace,
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
 	mutex_init(&ls->ls_waiters_mutex);
+	INIT_LIST_HEAD(&ls->ls_orphans);
+	mutex_init(&ls->ls_orphans_mutex);
 
 	INIT_LIST_HEAD(&ls->ls_nodes);
 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 27a75ce..c978c67 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+ * Copyright (C) 2006-2007 Red Hat, Inc.  All rights reserved.
  *
  * This copyrighted material is made available to anyone wishing to
use,
  * modify, copy, or redistribute it subject to the terms and conditions
@@ -128,35 +128,30 @@ static void compat_output(struct dlm_lock_result
*res,
 }
 #endif
 
+/* we could possibly check if the cancel of an orphan has resulted in
the lkb
+   being removed and then remove that lkb from the orphans list and
free it */
 
 void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
 {
 	struct dlm_ls *ls;
 	struct dlm_user_args *ua;
 	struct dlm_user_proc *proc;
-	int remove_ownqueue = 0;
+	int eol = 0, ast_type;
 
-	/* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each
-	   lkb before dealing with it.  We need to check this
-	   flag before taking ls_clear_proc_locks mutex because if
-	   it's set, dlm_clear_proc_locks() holds the mutex. */
-
-	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
-		/* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */
+	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
 		return;
-	}
 
 	ls = lkb->lkb_resource->res_ls;
 	mutex_lock(&ls->ls_clear_proc_locks);
 
 	/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
 	   can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
-	   lkb->ua so we can't try to use it. */
+	   lkb->ua so we can't try to use it.  This second check is necessary
+	   for cases where a completion ast is received for an operation that
+	   began before clear_proc_locks did its cancel/unlock. */
 
-	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
-		/* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */
+	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
 		goto out;
-	}
 
 	DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb););
 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
@@ -166,28 +161,42 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int
type)
 		goto out;
 
 	spin_lock(&proc->asts_spin);
-	if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
+
+	ast_type = lkb->lkb_ast_type;
+	lkb->lkb_ast_type |= type;
+
+	if (!ast_type) {
 		kref_get(&lkb->lkb_ref);
 		list_add_tail(&lkb->lkb_astqueue, &proc->asts);
-		lkb->lkb_ast_type |= type;
 		wake_up_interruptible(&proc->wait);
 	}
-
-	/* noqueue requests that fail may need to be removed from the
-	   proc's locks list, there should be a better way of detecting
-	   this situation than checking all these things... */
-
-	if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV &&
-	    ua->lksb.sb_status == -EAGAIN && !list_empty(&lkb->lkb_ownqueue))
-		remove_ownqueue = 1;
-
-	/* unlocks or cancels of waiting requests need to be removed from the
-	   proc's unlocking list, again there must be a better way...  */
-
-	if (ua->lksb.sb_status == -DLM_EUNLOCK ||
+	if (type == AST_COMP && (ast_type & AST_COMP))
+		log_debug(ls, "ast overlap %x status %x %x",
+			  lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags);
+
+	/* Figure out if this lock is at the end of its life and no longer
+	   available for the application to use.  The lkb still exists until
+	   the final ast is read.  A lock becomes EOL in three situations:
+	     1. a noqueue request fails with EAGAIN
+	     2. an unlock completes with EUNLOCK
+	     3. a cancel of a waiting request completes with ECANCEL
+	   An EOL lock needs to be removed from the process's list of locks.
+	   And we can't allow any new operation on an EOL lock.  This is
+	   not related to the lifetime of the lkb struct which is managed
+	   entirely by refcount. */
+
+	if (type == AST_COMP &&
+	    lkb->lkb_grmode == DLM_LOCK_IV &&
+	    ua->lksb.sb_status == -EAGAIN)
+		eol = 1;
+	else if (ua->lksb.sb_status == -DLM_EUNLOCK ||
 	    (ua->lksb.sb_status == -DLM_ECANCEL &&
 	     lkb->lkb_grmode == DLM_LOCK_IV))
-		remove_ownqueue = 1;
+		eol = 1;
+	if (eol) {
+		lkb->lkb_ast_type &= ~AST_BAST;
+		lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
+	}
 
 	/* We want to copy the lvb to userspace when the completion
 	   ast is read if the status is 0, the lock has an lvb and
@@ -204,11 +213,13 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int
type)
 
 	spin_unlock(&proc->asts_spin);
 
-	if (remove_ownqueue) {
+	if (eol) {
 		spin_lock(&ua->proc->locks_spin);
-		list_del_init(&lkb->lkb_ownqueue);
+		if (!list_empty(&lkb->lkb_ownqueue)) {
+			list_del_init(&lkb->lkb_ownqueue);
+			dlm_put_lkb(lkb);
+		}
 		spin_unlock(&ua->proc->locks_spin);
-		dlm_put_lkb(lkb);
 	}
  out:
 	mutex_unlock(&ls->ls_clear_proc_locks);
-- 
1.5.1.2





WARNING: multiple messages have this Message-ID (diff)
From: Steven Whitehouse <swhiteho@redhat.com>
To: cluster-devel@redhat.com
Cc: linux-kernel@vger.kernel.org, David Teigland <teigland@redhat.com>
Subject: [DLM] overlapping cancel and unlock [13/34]
Date: Tue, 01 May 2007 11:09:12 +0100	[thread overview]
Message-ID: <1178014152.5462.153.camel@quoit.chygwyn.com> (raw)
In-Reply-To: <1178013376.5462.127.camel@quoit.chygwyn.com>

>From ef0c2bb05f40f9a0cd2deae63e199bfa62faa7fa Mon Sep 17 00:00:00 2001
From: David Teigland <teigland@redhat.com>
Date: Wed, 28 Mar 2007 09:56:46 -0500
Subject: [PATCH] [DLM] overlapping cancel and unlock

Full cancel and force-unlock support.  In the past, cancel and
force-unlock
wouldn't work if there was another operation in progress on the lock.
Now,
both cancel and unlock-force can overlap an operation on a lock, meaning
there
may be 2 or 3 operations in progress on a lock in parallel.  This
support is
important not only because cancel and force-unlock are explicit
operations
that an app can use, but both are used implicitly when a process exits
while
holding locks.

Summary of changes:

- add-to and remove-from waiters functions were rewritten to handle
situations
  with more than one remote operation outstanding on a lock

- validate_unlock_args detects when an overlapping cancel/unlock-force
  can be sent and when it needs to be delayed until a request/lookup
  reply is received

- processing request/lookup replies detects when cancel/unlock-force
  occured during the op, and carries out the delayed cancel/unlock-force

- manipulation of the "waiters" (remote operation) state of a lock moved
under
  the standard rsb mutex that protects all the other lock state

- the two recovery routines related to locks on the waiters list changed
  according to the way lkb's are now locked before accessing waiters
state

- waiters recovery detects when lkb's being recovered have overlapping
  cancel/unlock-force, and may not recover such locks

- revert_lock (cancel) returns a value to distinguish cases where it did
  nothing vs cases where it actually did a cancel; the cancel completion
ast
  should only be done when cancel did something

- orphaned locks put on new list so they can be found later for purging

- cancel must be called on a lock when making it an orphan

- flag user locks (ENDOFLIFE) at the end of their useful life (to the
  application) so we can return an error for any further
cancel/unlock-force

- we weren't setting COMP/BAST ast flags if one was already set, so we'd
lose
  either a completion or blocking ast

- clear an unread bast on a lock that's become unlocked

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>

diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 61d9320..178931c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -2,7 +2,7 @@

*******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights
reserved.
-**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to
use,
 **  modify, copy, or redistribute it subject to the terms and
conditions
@@ -210,6 +210,9 @@ struct dlm_args {
 #define DLM_IFL_MSTCPY		0x00010000
 #define DLM_IFL_RESEND		0x00020000
 #define DLM_IFL_DEAD		0x00040000
+#define DLM_IFL_OVERLAP_UNLOCK  0x00080000
+#define DLM_IFL_OVERLAP_CANCEL  0x00100000
+#define DLM_IFL_ENDOFLIFE	0x00200000
 #define DLM_IFL_USER		0x00000001
 #define DLM_IFL_ORPHAN		0x00000002
 
@@ -230,8 +233,8 @@ struct dlm_lkb {
 	int8_t			lkb_grmode;	/* granted lock mode */
 	int8_t			lkb_bastmode;	/* requested mode */
 	int8_t			lkb_highbast;	/* highest mode bast sent for */
-
 	int8_t			lkb_wait_type;	/* type of reply waiting for */
+	int8_t			lkb_wait_count;
 	int8_t			lkb_ast_type;	/* type of ast queued for */
 
 	struct list_head	lkb_idtbl_list;	/* lockspace lkbtbl */
@@ -440,6 +443,9 @@ struct dlm_ls {
 	struct mutex		ls_waiters_mutex;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
+	struct mutex		ls_orphans_mutex;
+	struct list_head	ls_orphans;
+
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e725005..b865a46 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1,7 +1,7 @@
 /******************************************************************************

*******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to
use,
 **  modify, copy, or redistribute it subject to the terms and
conditions
@@ -254,6 +254,22 @@ static inline int down_conversion(struct dlm_lkb
*lkb)
 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 }
 
+static inline int is_overlap_unlock(struct dlm_lkb *lkb)
+{
+	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
+}
+
+static inline int is_overlap_cancel(struct dlm_lkb *lkb)
+{
+	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
+}
+
+static inline int is_overlap(struct dlm_lkb *lkb)
+{
+	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
+				  DLM_IFL_OVERLAP_CANCEL));
+}
+
 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 {
 	if (is_master_copy(lkb))
@@ -267,6 +283,12 @@ static void queue_cast(struct dlm_rsb *r, struct
dlm_lkb *lkb, int rv)
 	dlm_add_ast(lkb, AST_COMP);
 }
 
+static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb
*lkb)
+{
+	queue_cast(r, lkb,
+		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
+}
+
 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int
rqmode)
 {
 	if (is_master_copy(lkb))
@@ -547,6 +569,7 @@ static int create_lkb(struct dlm_ls *ls, struct
dlm_lkb **lkb_ret)
 	lkb->lkb_grmode = DLM_LOCK_IV;
 	kref_init(&lkb->lkb_ref);
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
+	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 
 	get_random_bytes(&bucket, sizeof(bucket));
 	bucket &= (ls->ls_lkbtbl_size - 1);
@@ -735,23 +758,75 @@ static void move_lkb(struct dlm_rsb *r, struct
dlm_lkb *lkb, int sts)
 	unhold_lkb(lkb);
 }
 
+static int msg_reply_type(int mstype)
+{
+	switch (mstype) {
+	case DLM_MSG_REQUEST:
+		return DLM_MSG_REQUEST_REPLY;
+	case DLM_MSG_CONVERT:
+		return DLM_MSG_CONVERT_REPLY;
+	case DLM_MSG_UNLOCK:
+		return DLM_MSG_UNLOCK_REPLY;
+	case DLM_MSG_CANCEL:
+		return DLM_MSG_CANCEL_REPLY;
+	case DLM_MSG_LOOKUP:
+		return DLM_MSG_LOOKUP_REPLY;
+	}
+	return -1;
+}
+
 /* add/remove lkb from global waiters list of lkb's waiting for
    a reply from a remote node */
 
-static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
+static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
 {
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int error = 0;
 
 	mutex_lock(&ls->ls_waiters_mutex);
-	if (lkb->lkb_wait_type) {
-		log_print("add_to_waiters error %d", lkb->lkb_wait_type);
+
+	if (is_overlap_unlock(lkb) ||
+	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
+		error = -EINVAL;
+		goto out;
+	}
+
+	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
+		switch (mstype) {
+		case DLM_MSG_UNLOCK:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
+			break;
+		case DLM_MSG_CANCEL:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
+			break;
+		default:
+			error = -EBUSY;
+			goto out;
+		}
+		lkb->lkb_wait_count++;
+		hold_lkb(lkb);
+
+		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
+			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
+			  lkb->lkb_wait_count, lkb->lkb_flags);
 		goto out;
 	}
+
+	DLM_ASSERT(!lkb->lkb_wait_count,
+		   dlm_print_lkb(lkb);
+		   printk("wait_count %d\n", lkb->lkb_wait_count););
+
+	lkb->lkb_wait_count++;
 	lkb->lkb_wait_type = mstype;
-	kref_get(&lkb->lkb_ref);
+	hold_lkb(lkb);
 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
  out:
+	if (error)
+		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
+			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
+			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
 	mutex_unlock(&ls->ls_waiters_mutex);
+	return error;
 }
 
 /* We clear the RESEND flag because we might be taking an lkb off the
waiters
@@ -759,34 +834,85 @@ static void add_to_waiters(struct dlm_lkb *lkb,
int mstype)
    request reply on the requestqueue) between dlm_recover_waiters_pre()
which
    set RESEND and dlm_recover_waiters_post() */
 
-static int _remove_from_waiters(struct dlm_lkb *lkb)
+static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 {
-	int error = 0;
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int overlap_done = 0;
 
-	if (!lkb->lkb_wait_type) {
-		log_print("remove_from_waiters error");
-		error = -EINVAL;
-		goto out;
+	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		overlap_done = 1;
+		goto out_del;
 	}
-	lkb->lkb_wait_type = 0;
+
+	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		overlap_done = 1;
+		goto out_del;
+	}
+
+	/* N.B. type of reply may not always correspond to type of original
+	   msg due to lookup->request optimization, verify others? */
+
+	if (lkb->lkb_wait_type) {
+		lkb->lkb_wait_type = 0;
+		goto out_del;
+	}
+
+	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
+		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
+	return -1;
+
+ out_del:
+	/* the force-unlock/cancel has completed and we haven't recvd a reply
+	   to the op that was in progress prior to the unlock/cancel; we
+	   give up on any reply to the earlier op.  FIXME: not sure when/how
+	   this would happen */
+
+	if (overlap_done && lkb->lkb_wait_type) {
+		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
+			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
+		lkb->lkb_wait_count--;
+		lkb->lkb_wait_type = 0;
+	}
+
+	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
+
 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
-	list_del(&lkb->lkb_wait_reply);
+	lkb->lkb_wait_count--;
+	if (!lkb->lkb_wait_count)
+		list_del_init(&lkb->lkb_wait_reply);
 	unhold_lkb(lkb);
- out:
-	return error;
+	return 0;
 }
 
-static int remove_from_waiters(struct dlm_lkb *lkb)
+static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 {
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error;
 
 	mutex_lock(&ls->ls_waiters_mutex);
-	error = _remove_from_waiters(lkb);
+	error = _remove_from_waiters(lkb, mstype);
 	mutex_unlock(&ls->ls_waiters_mutex);
 	return error;
 }
 
+/* Handles situations where we might be processing a "fake" or "stub"
reply in
+   which we can't try to take waiters_mutex again. */
+
+static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct
dlm_message *ms)
+{
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	int error;
+
+	if (ms != &ls->ls_stub_ms)
+		mutex_lock(&ls->ls_waiters_mutex);
+	error = _remove_from_waiters(lkb, ms->m_type);
+	if (ms != &ls->ls_stub_ms)
+		mutex_unlock(&ls->ls_waiters_mutex);
+	return error;
+}
+
 static void dir_remove(struct dlm_rsb *r)
 {
 	int to_nodeid;
@@ -988,8 +1114,14 @@ static void remove_lock_pc(struct dlm_rsb *r,
struct dlm_lkb *lkb)
 	_remove_lock(r, lkb);
 }
 
-static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+/* returns: 0 did nothing
+	    1 moved lock to granted
+	   -1 removed lock */
+
+static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
+	int rv = 0;
+
 	lkb->lkb_rqmode = DLM_LOCK_IV;
 
 	switch (lkb->lkb_status) {
@@ -997,6 +1129,7 @@ static void revert_lock(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 		break;
 	case DLM_LKSTS_CONVERT:
 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
+		rv = 1;
 		break;
 	case DLM_LKSTS_WAITING:
 		del_lkb(r, lkb);
@@ -1004,15 +1137,17 @@ static void revert_lock(struct dlm_rsb *r,
struct dlm_lkb *lkb)
 		/* this unhold undoes the original ref from create_lkb()
 		   so this leads to the lkb being freed */
 		unhold_lkb(lkb);
+		rv = -1;
 		break;
 	default:
 		log_print("invalid status for revert %d", lkb->lkb_status);
 	}
+	return rv;
 }
 
-static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
+static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	revert_lock(r, lkb);
+	return revert_lock(r, lkb);
 }
 
 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1499,7 +1634,7 @@ static void process_lookup_list(struct dlm_rsb *r)
 	struct dlm_lkb *lkb, *safe;
 
 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
-		list_del(&lkb->lkb_rsb_lookup);
+		list_del_init(&lkb->lkb_rsb_lookup);
 		_request_lock(r, lkb);
 		schedule();
 	}
@@ -1530,7 +1665,7 @@ static void confirm_master(struct dlm_rsb *r, int
error)
 		if (!list_empty(&r->res_lookup)) {
 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
 					 lkb_rsb_lookup);
-			list_del(&lkb->lkb_rsb_lookup);
+			list_del_init(&lkb->lkb_rsb_lookup);
 			r->res_first_lkid = lkb->lkb_id;
 			_request_lock(r, lkb);
 		} else
@@ -1614,6 +1749,9 @@ static int set_unlock_args(uint32_t flags, void
*astarg, struct dlm_args *args)
  		      DLM_LKF_FORCEUNLOCK))
 		return -EINVAL;
 
+	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
+		return -EINVAL;
+
 	args->flags = flags;
 	args->astparam = (long) astarg;
 	return 0;
@@ -1638,6 +1776,9 @@ static int validate_lock_args(struct dlm_ls *ls,
struct dlm_lkb *lkb,
 
 		if (lkb->lkb_wait_type)
 			goto out;
+
+		if (is_overlap(lkb))
+			goto out;
 	}
 
 	lkb->lkb_exflags = args->flags;
@@ -1654,35 +1795,126 @@ static int validate_lock_args(struct dlm_ls
*ls, struct dlm_lkb *lkb,
 	return rv;
 }
 
+/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
+   for success */
+
+/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
+   because there may be a lookup in progress and it's valid to do
+   cancel/unlockf on it */
+
 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args
*args)
 {
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int rv = -EINVAL;
 
-	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
+	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
+		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
+		dlm_print_lkb(lkb);
 		goto out;
+	}
 
-	if (args->flags & DLM_LKF_FORCEUNLOCK)
-		goto out_ok;
+	/* an lkb may still exist even though the lock is EOL'ed due to a
+	   cancel, unlock or failed noqueue request; an app can't use these
+	   locks; return same error as if the lkid had not been found at all
*/
 
-	if (args->flags & DLM_LKF_CANCEL &&
-	    lkb->lkb_status == DLM_LKSTS_GRANTED)
+	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
+		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
+		rv = -ENOENT;
 		goto out;
+	}
 
-	if (!(args->flags & DLM_LKF_CANCEL) &&
-	    lkb->lkb_status != DLM_LKSTS_GRANTED)
-		goto out;
+	/* an lkb may be waiting for an rsb lookup to complete where the
+	   lookup was initiated by another lock */
+
+	if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
+		if (!list_empty(&lkb->lkb_rsb_lookup)) {
+			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
+			list_del_init(&lkb->lkb_rsb_lookup);
+			queue_cast(lkb->lkb_resource, lkb,
+				   args->flags & DLM_LKF_CANCEL ?
+				   -DLM_ECANCEL : -DLM_EUNLOCK);
+			unhold_lkb(lkb); /* undoes create_lkb() */
+			rv = -EBUSY;
+			goto out;
+		}
+	}
+
+	/* cancel not allowed with another cancel/unlock in progress */
+
+	if (args->flags & DLM_LKF_CANCEL) {
+		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
+			goto out;
+
+		if (is_overlap(lkb))
+			goto out;
+
+		if (lkb->lkb_flags & DLM_IFL_RESEND) {
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
+			rv = -EBUSY;
+			goto out;
+		}
+
+		switch (lkb->lkb_wait_type) {
+		case DLM_MSG_LOOKUP:
+		case DLM_MSG_REQUEST:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
+			rv = -EBUSY;
+			goto out;
+		case DLM_MSG_UNLOCK:
+		case DLM_MSG_CANCEL:
+			goto out;
+		}
+		/* add_to_waiters() will set OVERLAP_CANCEL */
+		goto out_ok;
+	}
+
+	/* do we need to allow a force-unlock if there's a normal unlock
+	   already in progress?  in what conditions could the normal unlock
+	   fail such that we'd want to send a force-unlock to be sure? */
+
+	if (args->flags & DLM_LKF_FORCEUNLOCK) {
+		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
+			goto out;
+
+		if (is_overlap_unlock(lkb))
+			goto out;
 
+		if (lkb->lkb_flags & DLM_IFL_RESEND) {
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
+			rv = -EBUSY;
+			goto out;
+		}
+
+		switch (lkb->lkb_wait_type) {
+		case DLM_MSG_LOOKUP:
+		case DLM_MSG_REQUEST:
+			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
+			rv = -EBUSY;
+			goto out;
+		case DLM_MSG_UNLOCK:
+			goto out;
+		}
+		/* add_to_waiters() will set OVERLAP_UNLOCK */
+		goto out_ok;
+	}
+
+	/* normal unlock not allowed if there's any op in progress */
 	rv = -EBUSY;
-	if (lkb->lkb_wait_type)
+	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
 		goto out;
 
  out_ok:
-	lkb->lkb_exflags = args->flags;
+	/* an overlapping op shouldn't blow away exflags from other op */
+	lkb->lkb_exflags |= args->flags;
 	lkb->lkb_sbflags = 0;
 	lkb->lkb_astparam = args->astparam;
-
 	rv = 0;
  out:
+	if (rv)
+		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
+			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
+			  args->flags, lkb->lkb_wait_type,
+			  lkb->lkb_resource->res_name);
 	return rv;
 }
 
@@ -1759,17 +1991,19 @@ static int do_unlock(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 	return -DLM_EUNLOCK;
 }
 
-/* FIXME: if revert_lock() finds that the lkb is granted, we should
-   skip the queue_cast(ECANCEL).  It indicates that the request/convert
-   completed (and queued a normal ast) just before the cancel; we don't
-   want to clobber the sb_result for the normal ast with ECANCEL. */
+/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
  
 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	revert_lock(r, lkb);
-	queue_cast(r, lkb, -DLM_ECANCEL);
-	grant_pending_locks(r);
-	return -DLM_ECANCEL;
+	int error;
+
+	error = revert_lock(r, lkb);
+	if (error) {
+		queue_cast(r, lkb, -DLM_ECANCEL);
+		grant_pending_locks(r);
+		return -DLM_ECANCEL;
+	}
+	return 0;
 }
 
 /*
@@ -2035,6 +2269,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 
 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
 		error = 0;
+	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL |
DLM_LKF_FORCEUNLOCK)))
+		error = 0;
  out_put:
 	dlm_put_lkb(lkb);
  out:
@@ -2176,7 +2412,9 @@ static int send_common(struct dlm_rsb *r, struct
dlm_lkb *lkb, int mstype)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	add_to_waiters(lkb, mstype);
+	error = add_to_waiters(lkb, mstype);
+	if (error)
+		return error;
 
 	to_nodeid = r->res_nodeid;
 
@@ -2192,7 +2430,7 @@ static int send_common(struct dlm_rsb *r, struct
dlm_lkb *lkb, int mstype)
 	return 0;
 
  fail:
-	remove_from_waiters(lkb);
+	remove_from_waiters(lkb, msg_reply_type(mstype));
 	return error;
 }
 
@@ -2209,7 +2447,8 @@ static int send_convert(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 
 	/* down conversions go without a reply from the master */
 	if (!error && down_conversion(lkb)) {
-		remove_from_waiters(lkb);
+		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
+		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
 		r->res_ls->ls_stub_ms.m_result = 0;
 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
@@ -2280,7 +2519,9 @@ static int send_lookup(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	add_to_waiters(lkb, DLM_MSG_LOOKUP);
+	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
+	if (error)
+		return error;
 
 	to_nodeid = dlm_dir_nodeid(r);
 
@@ -2296,7 +2537,7 @@ static int send_lookup(struct dlm_rsb *r, struct
dlm_lkb *lkb)
 	return 0;
 
  fail:
-	remove_from_waiters(lkb);
+	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
 	return error;
 }
 
@@ -2740,7 +2981,7 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 {
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
-	int error, mstype;
+	int error, mstype, result;
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error) {
@@ -2749,20 +2990,15 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	mstype = lkb->lkb_wait_type;
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_request_reply not on waiters");
-		goto out;
-	}
-
-	/* this is the value returned from do_request() on the master */
-	error = ms->m_result;
-
 	r = lkb->lkb_resource;
 	hold_rsb(r);
 	lock_rsb(r);
 
+	mstype = lkb->lkb_wait_type;
+	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
+	if (error)
+		goto out;
+
 	/* Optimization: the dir node was also the master, so it took our
 	   lookup as a request and sent request reply instead of lookup reply
*/
 	if (mstype == DLM_MSG_LOOKUP) {
@@ -2770,14 +3006,15 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		lkb->lkb_nodeid = r->res_nodeid;
 	}
 
-	switch (error) {
+	/* this is the value returned from do_request() on the master */
+	result = ms->m_result;
+
+	switch (result) {
 	case -EAGAIN:
-		/* request would block (be queued) on remote master;
-		   the unhold undoes the original ref from create_lkb()
-		   so it leads to the lkb being freed */
+		/* request would block (be queued) on remote master */
 		queue_cast(r, lkb, -EAGAIN);
 		confirm_master(r, -EAGAIN);
-		unhold_lkb(lkb);
+		unhold_lkb(lkb); /* undoes create_lkb() */
 		break;
 
 	case -EINPROGRESS:
@@ -2785,41 +3022,62 @@ static void receive_request_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		/* request was queued or granted on remote master */
 		receive_flags_reply(lkb, ms);
 		lkb->lkb_remid = ms->m_lkid;
-		if (error)
+		if (result)
 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
 		else {
 			grant_lock_pc(r, lkb, ms);
 			queue_cast(r, lkb, 0);
 		}
-		confirm_master(r, error);
+		confirm_master(r, result);
 		break;
 
 	case -EBADR:
 	case -ENOTBLK:
 		/* find_rsb failed to find rsb or rsb wasn't master */
+		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
+			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
 		r->res_nodeid = -1;
 		lkb->lkb_nodeid = -1;
-		_request_lock(r, lkb);
+
+		if (is_overlap(lkb)) {
+			/* we'll ignore error in cancel/unlock reply */
+			queue_cast_overlap(r, lkb);
+			unhold_lkb(lkb); /* undoes create_lkb() */
+		} else
+			_request_lock(r, lkb);
 		break;
 
 	default:
-		log_error(ls, "receive_request_reply error %d", error);
+		log_error(ls, "receive_request_reply %x error %d",
+			  lkb->lkb_id, result);
 	}
 
+	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS))
{
+		log_debug(ls, "receive_request_reply %x result %d unlock",
+			  lkb->lkb_id, result);
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		send_unlock(r, lkb);
+	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
+		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		send_cancel(r, lkb);
+	} else {
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+	}
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
- out:
 	dlm_put_lkb(lkb);
 }
 
 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb
*lkb,
 				    struct dlm_message *ms)
 {
-	int error = ms->m_result;
-
 	/* this is the value returned from do_convert() on the master */
-
-	switch (error) {
+	switch (ms->m_result) {
 	case -EAGAIN:
 		/* convert would block (be queued) on remote master */
 		queue_cast(r, lkb, -EAGAIN);
@@ -2839,19 +3097,26 @@ static void __receive_convert_reply(struct
dlm_rsb *r, struct dlm_lkb *lkb,
 		break;
 
 	default:
-		log_error(r->res_ls, "receive_convert_reply error %d", error);
+		log_error(r->res_ls, "receive_convert_reply %x error %d",
+			  lkb->lkb_id, ms->m_result);
 	}
 }
 
 static void _receive_convert_reply(struct dlm_lkb *lkb, struct
dlm_message *ms)
 {
 	struct dlm_rsb *r = lkb->lkb_resource;
+	int error;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
-	__receive_convert_reply(r, lkb, ms);
+	/* stub reply can happen with waiters_mutex held */
+	error = remove_from_waiters_ms(lkb, ms);
+	if (error)
+		goto out;
 
+	__receive_convert_reply(r, lkb, ms);
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 }
@@ -2868,37 +3133,38 @@ static void receive_convert_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_convert_reply not on waiters");
-		goto out;
-	}
-
 	_receive_convert_reply(lkb, ms);
- out:
 	dlm_put_lkb(lkb);
 }
 
 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct
dlm_message *ms)
 {
 	struct dlm_rsb *r = lkb->lkb_resource;
-	int error = ms->m_result;
+	int error;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
+	/* stub reply can happen with waiters_mutex held */
+	error = remove_from_waiters_ms(lkb, ms);
+	if (error)
+		goto out;
+
 	/* this is the value returned from do_unlock() on the master */
 
-	switch (error) {
+	switch (ms->m_result) {
 	case -DLM_EUNLOCK:
 		receive_flags_reply(lkb, ms);
 		remove_lock_pc(r, lkb);
 		queue_cast(r, lkb, -DLM_EUNLOCK);
 		break;
+	case -ENOENT:
+		break;
 	default:
-		log_error(r->res_ls, "receive_unlock_reply error %d", error);
+		log_error(r->res_ls, "receive_unlock_reply %x error %d",
+			  lkb->lkb_id, ms->m_result);
 	}
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 }
@@ -2915,37 +3181,39 @@ static void receive_unlock_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_unlock_reply not on waiters");
-		goto out;
-	}
-
 	_receive_unlock_reply(lkb, ms);
- out:
 	dlm_put_lkb(lkb);
 }
 
 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct
dlm_message *ms)
 {
 	struct dlm_rsb *r = lkb->lkb_resource;
-	int error = ms->m_result;
+	int error;
 
 	hold_rsb(r);
 	lock_rsb(r);
 
+	/* stub reply can happen with waiters_mutex held */
+	error = remove_from_waiters_ms(lkb, ms);
+	if (error)
+		goto out;
+
 	/* this is the value returned from do_cancel() on the master */
 
-	switch (error) {
+	switch (ms->m_result) {
 	case -DLM_ECANCEL:
 		receive_flags_reply(lkb, ms);
 		revert_lock_pc(r, lkb);
-		queue_cast(r, lkb, -DLM_ECANCEL);
+		if (ms->m_result)
+			queue_cast(r, lkb, -DLM_ECANCEL);
+		break;
+	case 0:
 		break;
 	default:
-		log_error(r->res_ls, "receive_cancel_reply error %d", error);
+		log_error(r->res_ls, "receive_cancel_reply %x error %d",
+			  lkb->lkb_id, ms->m_result);
 	}
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 }
@@ -2962,14 +3230,7 @@ static void receive_cancel_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 	}
 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_cancel_reply not on waiters");
-		goto out;
-	}
-
 	_receive_cancel_reply(lkb, ms);
- out:
 	dlm_put_lkb(lkb);
 }
 
@@ -2985,20 +3246,17 @@ static void receive_lookup_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		return;
 	}
 
-	error = remove_from_waiters(lkb);
-	if (error) {
-		log_error(ls, "receive_lookup_reply not on waiters");
-		goto out;
-	}
-
-	/* this is the value returned by dlm_dir_lookup on dir node
+	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
 	   FIXME: will a non-zero error ever be returned? */
-	error = ms->m_result;
 
 	r = lkb->lkb_resource;
 	hold_rsb(r);
 	lock_rsb(r);
 
+	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
+	if (error)
+		goto out;
+
 	ret_nodeid = ms->m_nodeid;
 	if (ret_nodeid == dlm_our_nodeid()) {
 		r->res_nodeid = 0;
@@ -3009,14 +3267,22 @@ static void receive_lookup_reply(struct dlm_ls
*ls, struct dlm_message *ms)
 		r->res_nodeid = ret_nodeid;
 	}
 
+	if (is_overlap(lkb)) {
+		log_debug(ls, "receive_lookup_reply %x unlock %x",
+			  lkb->lkb_id, lkb->lkb_flags);
+		queue_cast_overlap(r, lkb);
+		unhold_lkb(lkb); /* undoes create_lkb() */
+		goto out_list;
+	}
+
 	_request_lock(r, lkb);
 
+ out_list:
 	if (!ret_nodeid)
 		process_lookup_list(r);
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
- out:
 	dlm_put_lkb(lkb);
 }
 
@@ -3153,9 +3419,9 @@ static void recover_convert_waiter(struct dlm_ls
*ls, struct dlm_lkb *lkb)
 {
 	if (middle_conversion(lkb)) {
 		hold_lkb(lkb);
+		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
 		ls->ls_stub_ms.m_result = -EINPROGRESS;
 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-		_remove_from_waiters(lkb);
 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
 
 		/* Same special case as in receive_rcom_lock_args() */
@@ -3227,18 +3493,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 
 		case DLM_MSG_UNLOCK:
 			hold_lkb(lkb);
+			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-			_remove_from_waiters(lkb);
 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
 			break;
 
 		case DLM_MSG_CANCEL:
 			hold_lkb(lkb);
+			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-			_remove_from_waiters(lkb);
 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
 			break;
@@ -3252,37 +3518,47 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 	mutex_unlock(&ls->ls_waiters_mutex);
 }
 
-static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb
**lkb_ret)
+static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
-	int rv = 0;
+	int found = 0;
 
 	mutex_lock(&ls->ls_waiters_mutex);
 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
-			rv = lkb->lkb_wait_type;
-			_remove_from_waiters(lkb);
-			lkb->lkb_flags &= ~DLM_IFL_RESEND;
+			hold_lkb(lkb);
+			found = 1;
 			break;
 		}
 	}
 	mutex_unlock(&ls->ls_waiters_mutex);
 
-	if (!rv)
+	if (!found)
 		lkb = NULL;
-	*lkb_ret = lkb;
-	return rv;
+	return lkb;
 }
 
 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be
the
    master or dir-node for r.  Processing the lkb may result in it being
placed
    back on waiters. */
 
+/* We do this after normal locking has been enabled and any saved
messages
+   (in requestqueue) have been processed.  We should be confident that
at
+   this point we won't get or process a reply to any of these waiting
+   operations.  But, new ops may be coming in on the rsbs/locks here
from
+   userspace or remotely. */
+
+/* there may have been an overlap unlock/cancel prior to recovery or
after
+   recovery.  if before, the lkb may still have a pos wait_count; if
after, the
+   overlap flag would just have been set and nothing new sent.  we can
be
+   confident here than any replies to either the initial op or overlap
ops
+   prior to recovery have been received. */
+
 int dlm_recover_waiters_post(struct dlm_ls *ls)
 {
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
-	int error = 0, mstype;
+	int error = 0, mstype, err, oc, ou;
 
 	while (1) {
 		if (dlm_locking_stopped(ls)) {
@@ -3291,48 +3567,78 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 			break;
 		}
 
-		mstype = remove_resend_waiter(ls, &lkb);
-		if (!mstype)
+		lkb = find_resend_waiter(ls);
+		if (!lkb)
 			break;
 
 		r = lkb->lkb_resource;
+		hold_rsb(r);
+		lock_rsb(r);
+
+		mstype = lkb->lkb_wait_type;
+		oc = is_overlap_cancel(lkb);
+		ou = is_overlap_unlock(lkb);
+		err = 0;
 
 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
 
-		switch (mstype) {
-
-		case DLM_MSG_LOOKUP:
-			hold_rsb(r);
-			lock_rsb(r);
-			_request_lock(r, lkb);
-			if (is_master(r))
-				confirm_master(r, 0);
-			unlock_rsb(r);
-			put_rsb(r);
-			break;
-
-		case DLM_MSG_REQUEST:
-			hold_rsb(r);
-			lock_rsb(r);
-			_request_lock(r, lkb);
-			if (is_master(r))
-				confirm_master(r, 0);
-			unlock_rsb(r);
-			put_rsb(r);
-			break;
-
-		case DLM_MSG_CONVERT:
-			hold_rsb(r);
-			lock_rsb(r);
-			_convert_lock(r, lkb);
-			unlock_rsb(r);
-			put_rsb(r);
-			break;
-
-		default:
-			log_error(ls, "recover_waiters_post type %d", mstype);
+		/* At this point we assume that we won't get a reply to any
+		   previous op or overlap op on this lock.  First, do a big
+		   remove_from_waiters() for all previous ops. */
+
+		lkb->lkb_flags &= ~DLM_IFL_RESEND;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
+		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
+		lkb->lkb_wait_type = 0;
+		lkb->lkb_wait_count = 0;
+		mutex_lock(&ls->ls_waiters_mutex);
+		list_del_init(&lkb->lkb_wait_reply);
+		mutex_unlock(&ls->ls_waiters_mutex);
+		unhold_lkb(lkb); /* for waiters list */
+
+		if (oc || ou) {
+			/* do an unlock or cancel instead of resending */
+			switch (mstype) {
+			case DLM_MSG_LOOKUP:
+			case DLM_MSG_REQUEST:
+				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
+							-DLM_ECANCEL);
+				unhold_lkb(lkb); /* undoes create_lkb() */
+				break;
+			case DLM_MSG_CONVERT:
+				if (oc) {
+					queue_cast(r, lkb, -DLM_ECANCEL);
+				} else {
+					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
+					_unlock_lock(r, lkb);
+				}
+				break;
+			default:
+				err = 1;
+			}
+		} else {
+			switch (mstype) {
+			case DLM_MSG_LOOKUP:
+			case DLM_MSG_REQUEST:
+				_request_lock(r, lkb);
+				if (is_master(r))
+					confirm_master(r, 0);
+				break;
+			case DLM_MSG_CONVERT:
+				_convert_lock(r, lkb);
+				break;
+			default:
+				err = 1;
+			}
 		}
+
+		if (err)
+			log_error(ls, "recover_waiters_post %x %d %x %d %d",
+			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
+		unlock_rsb(r);
+		put_rsb(r);
+		dlm_put_lkb(lkb);
 	}
 
 	return error;
@@ -3684,7 +3990,7 @@ int dlm_user_request(struct dlm_ls *ls, struct
dlm_user_args *ua,
 
 	/* add this new lkb to the per-process list of locks */
 	spin_lock(&ua->proc->locks_spin);
-	kref_get(&lkb->lkb_ref);
+	hold_lkb(lkb);
 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
 	spin_unlock(&ua->proc->locks_spin);
  out:
@@ -3774,6 +4080,9 @@ int dlm_user_unlock(struct dlm_ls *ls, struct
dlm_user_args *ua_tmp,
 
 	if (error == -DLM_EUNLOCK)
 		error = 0;
+	/* from validate_unlock_args() */
+	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
+		error = 0;
 	if (error)
 		goto out_put;
 
@@ -3786,6 +4095,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct
dlm_user_args *ua_tmp,
 	dlm_put_lkb(lkb);
  out:
 	unlock_recovery(ls);
+	kfree(ua_tmp);
 	return error;
 }
 
@@ -3815,33 +4125,37 @@ int dlm_user_cancel(struct dlm_ls *ls, struct
dlm_user_args *ua_tmp,
 
 	if (error == -DLM_ECANCEL)
 		error = 0;
-	if (error)
-		goto out_put;
-
-	/* this lkb was removed from the WAITING queue */
-	if (lkb->lkb_grmode == DLM_LOCK_IV) {
-		spin_lock(&ua->proc->locks_spin);
-		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
-		spin_unlock(&ua->proc->locks_spin);
-	}
+	/* from validate_unlock_args() */
+	if (error == -EBUSY)
+		error = 0;
  out_put:
 	dlm_put_lkb(lkb);
  out:
 	unlock_recovery(ls);
+	kfree(ua_tmp);
 	return error;
 }
 
+/* lkb's that are removed from the waiters list by revert are just left
on the
+   orphans list with the granted orphan locks, to be freed by purge */
+
 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+	struct dlm_args args;
+	int error;
 
-	if (ua->lksb.sb_lvbptr)
-		kfree(ua->lksb.sb_lvbptr);
-	kfree(ua);
-	lkb->lkb_astparam = (long)NULL;
+	hold_lkb(lkb);
+	mutex_lock(&ls->ls_orphans_mutex);
+	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
+	mutex_unlock(&ls->ls_orphans_mutex);
 
-	/* TODO: propogate to master if needed */
-	return 0;
+	set_unlock_args(0, ua, &args);
+
+	error = cancel_lock(ls, lkb, &args);
+	if (error == -DLM_ECANCEL)
+		error = 0;
+	return error;
 }
 
 /* The force flag allows the unlock to go ahead even if the lkb isn't
granted.
@@ -3853,10 +4167,6 @@ static int unlock_proc_lock(struct dlm_ls *ls,
struct dlm_lkb *lkb)
 	struct dlm_args args;
 	int error;
 
-	/* FIXME: we need to handle the case where the lkb is in limbo
-	   while the rsb is being looked up, currently we assert in
-	   _unlock_lock/is_remote because rsb nodeid is -1. */
-
 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
 
 	error = unlock_lock(ls, lkb, &args);
@@ -3865,6 +4175,31 @@ static int unlock_proc_lock(struct dlm_ls *ls,
struct dlm_lkb *lkb)
 	return error;
 }
 
+/* We have to release clear_proc_locks mutex before calling
unlock_proc_lock()
+   (which does lock_rsb) due to deadlock with receiving a message that
does
+   lock_rsb followed by dlm_user_add_ast() */
+
+static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
+				     struct dlm_user_proc *proc)
+{
+	struct dlm_lkb *lkb = NULL;
+
+	mutex_lock(&ls->ls_clear_proc_locks);
+	if (list_empty(&proc->locks))
+		goto out;
+
+	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
+	list_del_init(&lkb->lkb_ownqueue);
+
+	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
+		lkb->lkb_flags |= DLM_IFL_ORPHAN;
+	else
+		lkb->lkb_flags |= DLM_IFL_DEAD;
+ out:
+	mutex_unlock(&ls->ls_clear_proc_locks);
+	return lkb;
+}
+
 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts()
which
    1) references lkb->ua which we free here and 2) adds lkbs to
proc->asts,
    which we clear here. */
@@ -3880,18 +4215,15 @@ void dlm_clear_proc_locks(struct dlm_ls *ls,
struct dlm_user_proc *proc)
 	struct dlm_lkb *lkb, *safe;
 
 	lock_recovery(ls);
-	mutex_lock(&ls->ls_clear_proc_locks);
 
-	list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
-		list_del_init(&lkb->lkb_ownqueue);
-
-		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
-			lkb->lkb_flags |= DLM_IFL_ORPHAN;
+	while (1) {
+		lkb = del_proc_lock(ls, proc);
+		if (!lkb)
+			break;
+		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
 			orphan_proc_lock(ls, lkb);
-		} else {
-			lkb->lkb_flags |= DLM_IFL_DEAD;
+		else
 			unlock_proc_lock(ls, lkb);
-		}
 
 		/* this removes the reference for the proc->locks list
 		   added by dlm_user_request, it may result in the lkb
@@ -3900,6 +4232,8 @@ void dlm_clear_proc_locks(struct dlm_ls *ls,
struct dlm_user_proc *proc)
 		dlm_put_lkb(lkb);
 	}
 
+	mutex_lock(&ls->ls_clear_proc_locks);
+
 	/* in-progress unlocks */
 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
 		list_del_init(&lkb->lkb_ownqueue);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index f40817b..f607ca2 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -2,7 +2,7 @@

*******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights
reserved.
-**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to
use,
 **  modify, copy, or redistribute it subject to the terms and
conditions
@@ -459,6 +459,8 @@ static int new_lockspace(char *name, int namelen,
void **lockspace,
 
 	INIT_LIST_HEAD(&ls->ls_waiters);
 	mutex_init(&ls->ls_waiters_mutex);
+	INIT_LIST_HEAD(&ls->ls_orphans);
+	mutex_init(&ls->ls_orphans_mutex);
 
 	INIT_LIST_HEAD(&ls->ls_nodes);
 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 27a75ce..c978c67 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+ * Copyright (C) 2006-2007 Red Hat, Inc.  All rights reserved.
  *
  * This copyrighted material is made available to anyone wishing to
use,
  * modify, copy, or redistribute it subject to the terms and conditions
@@ -128,35 +128,30 @@ static void compat_output(struct dlm_lock_result
*res,
 }
 #endif
 
+/* we could possibly check if the cancel of an orphan has resulted in
the lkb
+   being removed and then remove that lkb from the orphans list and
free it */
 
 void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
 {
 	struct dlm_ls *ls;
 	struct dlm_user_args *ua;
 	struct dlm_user_proc *proc;
-	int remove_ownqueue = 0;
+	int eol = 0, ast_type;
 
-	/* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each
-	   lkb before dealing with it.  We need to check this
-	   flag before taking ls_clear_proc_locks mutex because if
-	   it's set, dlm_clear_proc_locks() holds the mutex. */
-
-	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
-		/* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */
+	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
 		return;
-	}
 
 	ls = lkb->lkb_resource->res_ls;
 	mutex_lock(&ls->ls_clear_proc_locks);
 
 	/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
 	   can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
-	   lkb->ua so we can't try to use it. */
+	   lkb->ua so we can't try to use it.  This second check is necessary
+	   for cases where a completion ast is received for an operation that
+	   began before clear_proc_locks did its cancel/unlock. */
 
-	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
-		/* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */
+	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
 		goto out;
-	}
 
 	DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb););
 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
@@ -166,28 +161,42 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int
type)
 		goto out;
 
 	spin_lock(&proc->asts_spin);
-	if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
+
+	ast_type = lkb->lkb_ast_type;
+	lkb->lkb_ast_type |= type;
+
+	if (!ast_type) {
 		kref_get(&lkb->lkb_ref);
 		list_add_tail(&lkb->lkb_astqueue, &proc->asts);
-		lkb->lkb_ast_type |= type;
 		wake_up_interruptible(&proc->wait);
 	}
-
-	/* noqueue requests that fail may need to be removed from the
-	   proc's locks list, there should be a better way of detecting
-	   this situation than checking all these things... */
-
-	if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV &&
-	    ua->lksb.sb_status == -EAGAIN && !list_empty(&lkb->lkb_ownqueue))
-		remove_ownqueue = 1;
-
-	/* unlocks or cancels of waiting requests need to be removed from the
-	   proc's unlocking list, again there must be a better way...  */
-
-	if (ua->lksb.sb_status == -DLM_EUNLOCK ||
+	if (type == AST_COMP && (ast_type & AST_COMP))
+		log_debug(ls, "ast overlap %x status %x %x",
+			  lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags);
+
+	/* Figure out if this lock is at the end of its life and no longer
+	   available for the application to use.  The lkb still exists until
+	   the final ast is read.  A lock becomes EOL in three situations:
+	     1. a noqueue request fails with EAGAIN
+	     2. an unlock completes with EUNLOCK
+	     3. a cancel of a waiting request completes with ECANCEL
+	   An EOL lock needs to be removed from the process's list of locks.
+	   And we can't allow any new operation on an EOL lock.  This is
+	   not related to the lifetime of the lkb struct which is managed
+	   entirely by refcount. */
+
+	if (type == AST_COMP &&
+	    lkb->lkb_grmode == DLM_LOCK_IV &&
+	    ua->lksb.sb_status == -EAGAIN)
+		eol = 1;
+	else if (ua->lksb.sb_status == -DLM_EUNLOCK ||
 	    (ua->lksb.sb_status == -DLM_ECANCEL &&
 	     lkb->lkb_grmode == DLM_LOCK_IV))
-		remove_ownqueue = 1;
+		eol = 1;
+	if (eol) {
+		lkb->lkb_ast_type &= ~AST_BAST;
+		lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
+	}
 
 	/* We want to copy the lvb to userspace when the completion
 	   ast is read if the status is 0, the lock has an lvb and
@@ -204,11 +213,13 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int
type)
 
 	spin_unlock(&proc->asts_spin);
 
-	if (remove_ownqueue) {
+	if (eol) {
 		spin_lock(&ua->proc->locks_spin);
-		list_del_init(&lkb->lkb_ownqueue);
+		if (!list_empty(&lkb->lkb_ownqueue)) {
+			list_del_init(&lkb->lkb_ownqueue);
+			dlm_put_lkb(lkb);
+		}
 		spin_unlock(&ua->proc->locks_spin);
-		dlm_put_lkb(lkb);
 	}
  out:
 	mutex_unlock(&ls->ls_clear_proc_locks);
-- 
1.5.1.2




  parent reply	other threads:[~2007-05-01 10:09 UTC|newest]

Thread overview: 77+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2007-05-01  9:56 [Cluster-devel] [GFS2] Patches for the current merge window [0/34] Steven Whitehouse
2007-05-01  9:56 ` Steven Whitehouse
2007-05-01  9:58 ` [Cluster-devel] [GFS2] Add gfs2_tool lockdump support to gfs2 (bz 228540) [1/34] Steven Whitehouse
2007-05-01  9:59 ` [Cluster-devel] [GFS2] fix bz 231369, gfs2 will oops if you specify an invalid mount option [2/34] Steven Whitehouse
2007-05-01  9:59   ` Steven Whitehouse
2007-05-01 10:28   ` [Cluster-devel] " Christoph Hellwig
2007-05-01 10:28     ` Christoph Hellwig
2007-05-01 13:41     ` [Cluster-devel] " Steven Whitehouse
2007-05-01 13:41       ` Steven Whitehouse
2007-05-01  9:59 ` [Cluster-devel] [DLM] Fix uninitialised variable in receiving [3/34] Steven Whitehouse
2007-05-01  9:59   ` Steven Whitehouse
2007-05-01 10:01 ` [Cluster-devel] [GFS2] Fix bz 231380, unlock page before dequeing glocks in gfs2_commit_write [4/34] Steven Whitehouse
2007-05-01 10:01   ` Steven Whitehouse
2007-05-01 10:02 ` [Cluster-devel] [GFS2] Fix bz 224480 and cleanup glock demotion code [5/34] Steven Whitehouse
2007-05-01 10:02   ` Steven Whitehouse
2007-05-01 10:02 ` [Cluster-devel] [GFS2] Fix a bug on i386 due to evaluation order [6/34] Steven Whitehouse
2007-05-01 10:02   ` Steven Whitehouse
2007-05-01 10:03 ` [Cluster-devel] [DLM] Don't delete misc device if lockspace removal fails [7/43] Steven Whitehouse
2007-05-01 10:03   ` Steven Whitehouse
2007-05-01 10:04 ` [Cluster-devel] [GFS2] Speed up lock_dlm's locking (move sprintf) [8/34] Steven Whitehouse
2007-05-01 10:04   ` Steven Whitehouse
2007-05-01 10:05 ` [Cluster-devel] [GFS2] Fix log entry list corruption [9/34] Steven Whitehouse
2007-05-01 10:05   ` Steven Whitehouse
2007-05-01 10:06 ` [Cluster-devel] [GFS2] flush the log if a transaction can't allocate space [10/34] Steven Whitehouse
2007-05-01 10:06   ` Steven Whitehouse
2007-05-01 10:07 ` [Cluster-devel] [GFS2] Red Hat bz 228540: owner references [11/34] Steven Whitehouse
2007-05-01 10:07   ` Steven Whitehouse
2007-05-01 10:07 ` [Cluster-devel] [DLM] fix coverity-spotted stupidity [12/34] Steven Whitehouse
2007-05-01 10:07   ` Steven Whitehouse
2007-05-01 10:09 ` Steven Whitehouse [this message]
2007-05-01 10:09   ` [DLM] overlapping cancel and unlock [13/34] Steven Whitehouse
2007-05-01 10:09 ` [Cluster-devel] [GFS2] use log_error before LM_OUT_ERROR [14/34] Steven Whitehouse
2007-05-01 10:09   ` Steven Whitehouse
2007-05-01 10:10 ` [Cluster-devel] [GFS2] Set drop_count to 0 (off) by default [15/34] Steven Whitehouse
2007-05-01 10:10   ` Steven Whitehouse
2007-05-01 10:11 ` [Cluster-devel] [DLM] split create_message function [16/34] Steven Whitehouse
2007-05-01 10:11   ` Steven Whitehouse
2007-05-01 10:12 ` [Cluster-devel] [DLM] add orphan purging code (1/2) [17/34] Steven Whitehouse
2007-05-01 10:12   ` Steven Whitehouse
2007-05-01 10:12 ` [Cluster-devel] [DLM] interface for purge (2/2) [18/34] Steven Whitehouse
2007-05-01 10:12   ` Steven Whitehouse
2007-05-01 10:13 ` [Cluster-devel] [DLM] change lkid format [19/34] Steven Whitehouse
2007-05-01 10:13   ` Steven Whitehouse
2007-05-01 10:14 ` [Cluster-devel] " Steven Whitehouse
2007-05-01 10:14   ` Steven Whitehouse
2007-05-01 10:14 ` [Cluster-devel] GFS2] Fix bz 234168 (ignoring rgrp flags) [20/34] Steven Whitehouse
2007-05-01 10:14   ` Steven Whitehouse
2007-05-01 10:15 ` [Cluster-devel] [DLM] Remove redundant assignment [21/34] Steven Whitehouse
2007-05-01 10:15   ` Steven Whitehouse
2007-05-01 10:16 ` [Cluster-devel] [DLM] Consolidate transport protocols [21/34] Steven Whitehouse
2007-05-01 10:16   ` Steven Whitehouse
2007-05-01 10:17 ` [Cluster-devel] DLM] fs/dlm/ast.c should #include "ast.h" [23/34] Steven Whitehouse
2007-05-01 10:17   ` Steven Whitehouse
2007-05-01 10:18 ` [Cluster-devel] [GFS2] bz 236008: Kernel gpf doing cat /debugfs/gfs2/xxx (lock dump) [24/34] Steven Whitehouse
2007-05-01 10:18   ` Steven Whitehouse
2007-05-01 10:19 ` [Cluster-devel] [GFS2] Patch to detect corrupt number of dir entries in leaf and/or inode blocks [25/34] Steven Whitehouse
2007-05-01 10:19   ` Steven Whitehouse
2007-05-01 10:20 ` [Cluster-devel] [GFS2] lockdump improvements [26/34] Steven Whitehouse
2007-05-01 10:20   ` Steven Whitehouse
2007-05-01 10:20 ` [Cluster-devel] [DLM] fix mode munging [27/34] Steven Whitehouse
2007-05-01 10:20   ` Steven Whitehouse
2007-05-01 10:21 ` [Cluster-devel] [DLM] Fix dlm_lowcoms_stop hang [28/34] Steven Whitehouse
2007-05-01 10:21   ` Steven Whitehouse
2007-05-01 10:22 ` [Cluster-devel] [DLM] Lowcomms nodeid range & initialisation fixes [29/34] Steven Whitehouse
2007-05-01 10:22   ` Steven Whitehouse
2007-05-01 10:22 ` [Cluster-devel] [GFS2] use lib/parser for parsing mount options [30/34] Steven Whitehouse
2007-05-01 10:22   ` Steven Whitehouse
2007-05-01 10:23 ` [Cluster-devel] [GFS2] Patch to fix mmap of stuffed files [31/34] Steven Whitehouse
2007-05-01 10:23   ` Steven Whitehouse
2007-05-01 10:24 ` [Cluster-devel] [GFS2] printk warning fixes [32/34] Steven Whitehouse
2007-05-01 10:24   ` Steven Whitehouse
2007-05-01 10:24 ` [Cluster-devel] [DLM] lowcomms style [33/34] Steven Whitehouse
2007-05-01 10:24   ` Steven Whitehouse
2007-05-01 10:25 ` [Cluster-devel] [GFS2] Uncomment sprintf_symbol calling code [34/34] Steven Whitehouse
2007-05-01 10:25   ` Steven Whitehouse
2007-05-01 14:11 ` [Cluster-devel] [GFS2/DLM] Pull request Steven Whitehouse
2007-05-01 14:11   ` Steven Whitehouse

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1178014152.5462.153.camel@quoit.chygwyn.com \
    --to=swhiteho@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.