cluster-devel.redhat.com archive mirror
 help / color / mirror / Atom feed
* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
       [not found] <9ba880ab-984b-4588-b2cb-04089b0943ee@zmail06.collab.prod.int.phx2.redhat.com>
@ 2011-10-05 19:25 ` Bob Peterson
  2011-10-05 20:05   ` David Teigland
  0 siblings, 1 reply; 11+ messages in thread
From: Bob Peterson @ 2011-10-05 19:25 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi,

This upstream patch changes the way DLM keeps track of RSBs.
Before, they were in a linked list off a hash table.  Now,
they're an rb_tree off the same hash table.  This speeds up
DLM lookups greatly.

Today's DLM is faster than older DLMs for many file systems,
(e.g. in RHEL5) due to the larger hash table size.  However,
this rb_tree implementation scales much better.  For my
1000-directories-with-1000-files test, the patch doesn't
show much of an improvement.  But when I scale the file system
to 4000 directories with 4000 files (16 million files), it
helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
42.01 hours to 23.68 hours.

With this patch I believe we could also reduce the size of
the hash table again or eliminate it completely, but we can
evaluate and do that later.

NOTE: Today's upstream DLM code has special code to
pre-allocate RSB structures for faster lookup.  This patch
eliminates that step, since it doesn't have a resource name
at that time for inserting new entries in the rb_tree.

Regards,

Bob Peterson
Red Hat File Systems

Signed-off-by: Bob Peterson <rpeterso@redhat.com> 
--
commit 4f65e6021b3b5582c731542552914a3f6865f8a5
Author: Bob Peterson <rpeterso@redhat.com>
Date:   Wed Oct 5 10:04:39 2011 -0500

    DLM: Convert rsb data from linked list to rb_tree
    
    This patch changes the way DLM keeps track of rsbs.  Before, they
    were in a linked list off a hash table.  Now, they're an rb_tree
    off the same hash table.  This speeds up DLM lookups greatly.

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 5977923..f2f4150 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -393,9 +393,10 @@ static const struct seq_operations format3_seq_ops;
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+	struct rb_node *node = NULL;
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r = NULL;
 	loff_t n = *pos;
 	unsigned bucket, entry;
 
@@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		ri->format = 3;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-		list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
-				    res_hashchain) {
+	if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+		node = rb_first(&ls->ls_rsbtbl[bucket].keep);
+		while (node) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -428,6 +430,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 				spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 				return ri;
 			}
+			node = rb_next(&r->res_hashnode);
 		}
 	}
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -449,9 +452,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -467,7 +469,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri = iter_ptr;
-	struct list_head *next;
+	struct rb_node *next;
 	struct dlm_rsb *r, *rp;
 	loff_t n = *pos;
 	unsigned bucket;
@@ -480,10 +482,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
-	next = rp->res_hashchain.next;
+	next = rb_next(&rp->res_hashnode);
 
-	if (next != &ls->ls_rsbtbl[bucket].list) {
-		r = list_entry(next, struct dlm_rsb, res_hashchain);
+	if (next) {
+		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +513,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index fe2860c..fd7da74 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -103,8 +103,8 @@ struct dlm_dirtable {
 };
 
 struct dlm_rsbtable {
-	struct list_head	list;
-	struct list_head	toss;
+	struct rb_root		keep;
+	struct rb_root		toss;
 	spinlock_t		lock;
 };
 
@@ -285,7 +285,7 @@ struct dlm_rsb {
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
-	struct list_head	res_hashchain;	/* rsbtbl */
+	struct rb_node		res_hashnode;	/* rsbtbl */
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
@@ -479,10 +479,6 @@ struct dlm_ls {
 	struct mutex		ls_timeout_mutex;
 	struct list_head	ls_timeout;
 
-	spinlock_t		ls_new_rsb_spin;
-	int			ls_new_rsb_count;
-	struct list_head	ls_new_rsb;	/* new rsb structs */
-
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 83b5e32..79789d1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/rbtree.h>
 #include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
@@ -327,38 +328,6 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
  * Basic operations on rsb's and lkb's
  */
 
-static int pre_rsb_struct(struct dlm_ls *ls)
-{
-	struct dlm_rsb *r1, *r2;
-	int count = 0;
-
-	spin_lock(&ls->ls_new_rsb_spin);
-	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
-		spin_unlock(&ls->ls_new_rsb_spin);
-		return 0;
-	}
-	spin_unlock(&ls->ls_new_rsb_spin);
-
-	r1 = dlm_allocate_rsb(ls);
-	r2 = dlm_allocate_rsb(ls);
-
-	spin_lock(&ls->ls_new_rsb_spin);
-	if (r1) {
-		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
-		ls->ls_new_rsb_count++;
-	}
-	if (r2) {
-		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
-		ls->ls_new_rsb_count++;
-	}
-	count = ls->ls_new_rsb_count;
-	spin_unlock(&ls->ls_new_rsb_spin);
-
-	if (!count)
-		return -ENOMEM;
-	return 0;
-}
-
 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
    unlock any spinlocks, go back and call pre_rsb_struct again.
    Otherwise, take an rsb off the list and return it. */
@@ -367,28 +336,16 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 			  struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r;
-	int count;
 
-	spin_lock(&ls->ls_new_rsb_spin);
-	if (list_empty(&ls->ls_new_rsb)) {
-		count = ls->ls_new_rsb_count;
-		spin_unlock(&ls->ls_new_rsb_spin);
-		log_debug(ls, "find_rsb retry %d %d %s",
-			  count, dlm_config.ci_new_rsb_count, name);
-		return -EAGAIN;
-	}
-
-	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
-	list_del(&r->res_hashchain);
-	ls->ls_new_rsb_count--;
-	spin_unlock(&ls->ls_new_rsb_spin);
+	r = dlm_allocate_rsb(ls);
+	if (!r)
+		return -ENOMEM;
 
 	r->res_ls = ls;
 	r->res_length = len;
 	memcpy(r->res_name, name, len);
 	mutex_init(&r->res_mutex);
 
-	INIT_LIST_HEAD(&r->res_hashchain);
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
 	INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,42 +357,76 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	return 0;
 }
 
-static int search_rsb_list(struct list_head *head, char *name, int len,
+static int search_rsb_tree(struct rb_root *tree, char *name, int len,
 			   unsigned int flags, struct dlm_rsb **r_ret)
 {
-	struct dlm_rsb *r;
-	int error = 0;
-
-	list_for_each_entry(r, head, res_hashchain) {
-		if (len == r->res_length && !memcmp(name, r->res_name, len))
+	struct rb_node **node = &tree->rb_node;
+	struct dlm_rsb *rsb = NULL;
+	int rc;
+
+	while (*node) {
+		rsb = rb_entry(*node, struct dlm_rsb, res_hashnode);
+		rc = memcmp(name, rsb->res_name, len);
+		if (rc < 0)
+			node = &((*node)->rb_left);
+		else if (rc > 0)
+			node = &((*node)->rb_right);
+		else
 			goto found;
 	}
 	*r_ret = NULL;
 	return -EBADR;
 
  found:
-	if (r->res_nodeid && (flags & R_MASTER))
-		error = -ENOTBLK;
-	*r_ret = r;
-	return error;
+	*r_ret = rsb;
+	if (rsb->res_nodeid && (flags & R_MASTER))
+		return -ENOTBLK;
+
+	return 0;
+}
+
+static void rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+{
+	struct rb_node **newn = &tree->rb_node, *parent = NULL;
+	int rc;
+	const char *name = rsb->res_name; /* This is just for clarity */
+
+	/* Figure out where to put new node */
+	while (*newn) {
+		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
+					       res_hashnode);
+
+		parent = *newn;
+		rc = memcmp(name, cur->res_name, cur->res_length);
+		if (rc < 0)
+			newn = &((*newn)->rb_left);
+		else if (rc > 0)
+			newn = &((*newn)->rb_right);
+		else
+			return;
+	}
+
+	rb_link_node(&rsb->res_hashnode, parent, newn);
+	rb_insert_color(&rsb->res_hashnode, tree);
 }
 
 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 		       unsigned int flags, struct dlm_rsb **r_ret)
 {
-	struct dlm_rsb *r;
+	struct dlm_rsb *r = NULL;
 	int error;
 
-	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
 	if (!error) {
 		kref_get(&r->res_ref);
 		goto out;
 	}
-	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 	if (error)
 		goto out;
 
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 
 	if (dlm_no_directory(ls))
 		goto out;
@@ -487,13 +478,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 	hash = jhash(name, namelen, 0);
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
- retry:
-	if (flags & R_CREATE) {
-		error = pre_rsb_struct(ls);
-		if (error < 0)
-			goto out;
-	}
-
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 
 	error = _search_rsb(ls, name, namelen, bucket, flags, &r);
@@ -508,10 +492,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 		goto out_unlock;
 
 	error = get_rsb_struct(ls, name, namelen, &r);
-	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-		goto retry;
-	}
 	if (error)
 		goto out_unlock;
 
@@ -527,7 +507,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 			nodeid = 0;
 		r->res_nodeid = nodeid;
 	}
-	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
+	rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
 	error = 0;
  out_unlock:
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -556,7 +536,8 @@ static void toss_rsb(struct kref *kref)
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
+	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
 	r->res_toss_time = jiffies;
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
@@ -1087,19 +1068,26 @@ static void dir_remove(struct dlm_rsb *r)
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int count = 0, found;
 
 	for (;;) {
 		found = 0;
 		spin_lock(&ls->ls_rsbtbl[b].lock);
-		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
-					    res_hashchain) {
-			if (!time_after_eq(jiffies, r->res_toss_time +
-					   dlm_config.ci_toss_secs * HZ))
-				continue;
-			found = 1;
-			break;
+		n = rb_first(&ls->ls_rsbtbl[b].toss);
+		while (n) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (unlikely(&r->res_hashnode == n)) {
+				spin_unlock(&ls->ls_rsbtbl[b].lock);
+				return 0;
+			}
+			if (time_after_eq(jiffies, r->res_toss_time +
+					  dlm_config.ci_toss_secs * HZ)) {
+				found = 1;
+				break;
+			}
+			n = rb_next(&r->res_hashnode);
 		}
 
 		if (!found) {
@@ -1108,7 +1096,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		if (kref_put(&r->res_ref, kill_rsb)) {
-			list_del(&r->res_hashchain);
+			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 			spin_unlock(&ls->ls_rsbtbl[b].lock);
 
 			if (is_master(r))
@@ -4441,16 +4429,20 @@ int dlm_purge_locks(struct dlm_ls *ls)
 
 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r, *r_ret = NULL;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
-		if (!rsb_flag(r, RSB_LOCKS_PURGED))
-			continue;
-		hold_rsb(r);
-		rsb_clear_flag(r, RSB_LOCKS_PURGED);
-		r_ret = r;
-		break;
+	n = rb_first(&ls->ls_rsbtbl[bucket].keep);
+	while (n) {
+		r = rb_entry(n, struct dlm_rsb, res_hashnode);
+		if (rsb_flag(r, RSB_LOCKS_PURGED)) {
+			hold_rsb(r);
+			rsb_clear_flag(r, RSB_LOCKS_PURGED);
+			r_ret = r;
+			break;
+		}
+		n = rb_next(&r->res_hashnode);
 	}
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 	return r_ret;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1d8f1a..0bee1c7 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -457,8 +457,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
+		ls->ls_rsbtbl[i].keep.rb_node = NULL;
+		ls->ls_rsbtbl[i].toss.rb_node = NULL;
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
@@ -483,9 +483,6 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	INIT_LIST_HEAD(&ls->ls_timeout);
 	mutex_init(&ls->ls_timeout_mutex);
 
-	INIT_LIST_HEAD(&ls->ls_new_rsb);
-	spin_lock_init(&ls->ls_new_rsb_spin);
-
 	INIT_LIST_HEAD(&ls->ls_nodes);
 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
 	ls->ls_num_nodes = 0;
@@ -685,7 +682,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct list_head *head;
+	struct rb_node *n = NULL;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -746,33 +743,21 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		head = &ls->ls_rsbtbl[i].list;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].keep);
 			dlm_free_rsb(rsb);
 		}
 
-		head = &ls->ls_rsbtbl[i].toss;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
 			dlm_free_rsb(rsb);
 		}
 	}
 
 	vfree(ls->ls_rsbtbl);
 
-	while (!list_empty(&ls->ls_new_rsb)) {
-		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
-				       res_hashchain);
-		list_del(&rsb->res_hashchain);
-		dlm_free_rsb(rsb);
-	}
-
 	/*
 	 * Free structures on any other lists
 	 */
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 1463823..65dff1b 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -715,6 +715,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 int dlm_create_root_list(struct dlm_ls *ls)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int i, error = 0;
 
@@ -727,9 +728,12 @@ int dlm_create_root_list(struct dlm_ls *ls)
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+		n = rb_first(&ls->ls_rsbtbl[i].keep);
+		while (n) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
+			n = rb_next(&r->res_hashnode);
 		}
 
 		/* If we're using a directory, add tossed rsbs to the root
@@ -741,9 +745,12 @@ int dlm_create_root_list(struct dlm_ls *ls)
 			continue;
 		}
 
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+		n = rb_first(&ls->ls_rsbtbl[i].toss);
+		while (n) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
+			n = rb_next(&r->res_hashnode);
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}
@@ -771,17 +778,21 @@ void dlm_release_root_list(struct dlm_ls *ls)
 
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
-	struct dlm_rsb *r, *safe;
+	struct rb_node *node, *next;
+	struct dlm_rsb *rsb;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
-					 res_hashchain) {
-			if (dlm_no_directory(ls) || !is_master(r)) {
-				list_del(&r->res_hashchain);
-				dlm_free_rsb(r);
+		node = rb_first(&ls->ls_rsbtbl[i].toss);
+		while (node) {
+			rsb = rb_entry(node, struct dlm_rsb, res_hashnode);
+			next = rb_next(&rsb->res_hashnode);
+			if (dlm_no_directory(ls) || !is_master(rsb)) {
+				rb_erase(node, &ls->ls_rsbtbl[i].toss);
+				dlm_free_rsb(rsb);
 			}
+			node = next;
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}



^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-05 19:25 ` [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree Bob Peterson
@ 2011-10-05 20:05   ` David Teigland
  2011-10-08 10:13     ` Bob Peterson
  0 siblings, 1 reply; 11+ messages in thread
From: David Teigland @ 2011-10-05 20:05 UTC (permalink / raw)
  To: cluster-devel.redhat.com

On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
> Hi,
> 
> This upstream patch changes the way DLM keeps track of RSBs.
> Before, they were in a linked list off a hash table.  Now,
> they're an rb_tree off the same hash table.  This speeds up
> DLM lookups greatly.
> 
> Today's DLM is faster than older DLMs for many file systems,
> (e.g. in RHEL5) due to the larger hash table size.  However,
> this rb_tree implementation scales much better.  For my
> 1000-directories-with-1000-files test, the patch doesn't
> show much of an improvement.  But when I scale the file system
> to 4000 directories with 4000 files (16 million files), it
> helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
> 42.01 hours to 23.68 hours.

How many hash table buckets were you using in that test?
If it was the default (1024), I'd be interested to know how
16k compares.

> With this patch I believe we could also reduce the size of
> the hash table again or eliminate it completely, but we can
> evaluate and do that later.
> 
> NOTE: Today's upstream DLM code has special code to
> pre-allocate RSB structures for faster lookup.  This patch
> eliminates that step, since it doesn't have a resource name
> at that time for inserting new entries in the rb_tree.

We need to keep that; why do you say there's no resource name?
pre_rsb_struct() and get_rsb_struct() are specially designed to work
as they do because:

> @@ -367,28 +336,16 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
>  			  struct dlm_rsb **r_ret)
> +	r = dlm_allocate_rsb(ls);
> +	if (!r)
> +		return -ENOMEM;

That's not allowed here because a spinlock is held:

>  	spin_lock(&ls->ls_rsbtbl[bucket].lock);
>  
>  	error = _search_rsb(ls, name, namelen, bucket, flags, &r);
> @@ -508,10 +492,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
>  		goto out_unlock;
>  
>  	error = get_rsb_struct(ls, name, namelen, &r);
> -	if (error == -EAGAIN) {
> -		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
> -		goto retry;
> -	}
>  	if (error)
>  		goto out_unlock;

If you try to fix the problem above by releasing the spinlock between the
search and the malloc, then you have to repeat the search.  Eliminating
the repeated search is the main reason for pre_rsb/get_rsb.



^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-05 20:05   ` David Teigland
@ 2011-10-08 10:13     ` Bob Peterson
  2011-10-10 14:43       ` David Teigland
  0 siblings, 1 reply; 11+ messages in thread
From: Bob Peterson @ 2011-10-08 10:13 UTC (permalink / raw)
  To: cluster-devel.redhat.com

----- Original Message -----
| On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
| > Hi,
| > 
| > This upstream patch changes the way DLM keeps track of RSBs.
| > Before, they were in a linked list off a hash table.  Now,
| > they're an rb_tree off the same hash table.  This speeds up
| > DLM lookups greatly.
| > 
| > Today's DLM is faster than older DLMs for many file systems,
| > (e.g. in RHEL5) due to the larger hash table size.  However,
| > this rb_tree implementation scales much better.  For my
| > 1000-directories-with-1000-files test, the patch doesn't
| > show much of an improvement.  But when I scale the file system
| > to 4000 directories with 4000 files (16 million files), it
| > helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
| > 42.01 hours to 23.68 hours.
| 
| How many hash table buckets were you using in that test?
| If it was the default (1024), I'd be interested to know how
| 16k compares.

Hi,

Interestingly, on the stock 2.6.32-206.el6.x86_64 kernel
and 16K hash buckets, the time was virtually the same as
with my patch: 1405m46.519s (23.43 hours). So perhaps we
should re-evaluate whether we should use the rb_tree
implementation or just increase the hash buckets as needed.
I guess the question is now mainly related to scaling and
memory usage for all those hash tables at this point.

Regards,

Bob Peterson
Red Hat File Systems



^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-08 10:13     ` Bob Peterson
@ 2011-10-10 14:43       ` David Teigland
  2011-10-10 15:51         ` Steven Whitehouse
  2011-10-24 19:47         ` Bob Peterson
  0 siblings, 2 replies; 11+ messages in thread
From: David Teigland @ 2011-10-10 14:43 UTC (permalink / raw)
  To: cluster-devel.redhat.com

On Sat, Oct 08, 2011 at 06:13:52AM -0400, Bob Peterson wrote:
> ----- Original Message -----
> | On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
> | > Hi,
> | > 
> | > This upstream patch changes the way DLM keeps track of RSBs.
> | > Before, they were in a linked list off a hash table.  Now,
> | > they're an rb_tree off the same hash table.  This speeds up
> | > DLM lookups greatly.
> | > 
> | > Today's DLM is faster than older DLMs for many file systems,
> | > (e.g. in RHEL5) due to the larger hash table size.  However,
> | > this rb_tree implementation scales much better.  For my
> | > 1000-directories-with-1000-files test, the patch doesn't
> | > show much of an improvement.  But when I scale the file system
> | > to 4000 directories with 4000 files (16 million files), it
> | > helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
> | > 42.01 hours to 23.68 hours.
> | 
> | How many hash table buckets were you using in that test?
> | If it was the default (1024), I'd be interested to know how
> | 16k compares.
> 
> Hi,
> 
> Interestingly, on the stock 2.6.32-206.el6.x86_64 kernel
> and 16K hash buckets, the time was virtually the same as
> with my patch: 1405m46.519s (23.43 hours). So perhaps we
> should re-evaluate whether we should use the rb_tree
> implementation or just increase the hash buckets as needed.
> I guess the question is now mainly related to scaling and
> memory usage for all those hash tables at this point.

I'm still interested in possibly using an rbtree with fewer hash buckets.

At the same time, I think the bigger problem may be why gfs2 is caching so
many locks in the first place, especially for millions of unlinked files
whose locks will never benefit you again.



^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-10 14:43       ` David Teigland
@ 2011-10-10 15:51         ` Steven Whitehouse
  2011-10-10 17:01           ` David Teigland
  2011-10-24 19:47         ` Bob Peterson
  1 sibling, 1 reply; 11+ messages in thread
From: Steven Whitehouse @ 2011-10-10 15:51 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi,

On Mon, 2011-10-10 at 10:43 -0400, David Teigland wrote:
> On Sat, Oct 08, 2011 at 06:13:52AM -0400, Bob Peterson wrote:
> > ----- Original Message -----
> > | On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
> > | > Hi,
> > | > 
> > | > This upstream patch changes the way DLM keeps track of RSBs.
> > | > Before, they were in a linked list off a hash table.  Now,
> > | > they're an rb_tree off the same hash table.  This speeds up
> > | > DLM lookups greatly.
> > | > 
> > | > Today's DLM is faster than older DLMs for many file systems,
> > | > (e.g. in RHEL5) due to the larger hash table size.  However,
> > | > this rb_tree implementation scales much better.  For my
> > | > 1000-directories-with-1000-files test, the patch doesn't
> > | > show much of an improvement.  But when I scale the file system
> > | > to 4000 directories with 4000 files (16 million files), it
> > | > helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
> > | > 42.01 hours to 23.68 hours.
> > | 
> > | How many hash table buckets were you using in that test?
> > | If it was the default (1024), I'd be interested to know how
> > | 16k compares.
> > 
> > Hi,
> > 
> > Interestingly, on the stock 2.6.32-206.el6.x86_64 kernel
> > and 16K hash buckets, the time was virtually the same as
> > with my patch: 1405m46.519s (23.43 hours). So perhaps we
> > should re-evaluate whether we should use the rb_tree
> > implementation or just increase the hash buckets as needed.
> > I guess the question is now mainly related to scaling and
> > memory usage for all those hash tables at this point.
> 
> I'm still interested in possibly using an rbtree with fewer hash buckets.
> 
> At the same time, I think the bigger problem may be why gfs2 is caching so
> many locks in the first place, especially for millions of unlinked files
> whose locks will never benefit you again.
> 

I doubt that it is caching locks for unlinked files for any great period
of time if there is any memory pressure. It is memory pressure which
instigates the dropping of glocks relating to inodes usually. If the
glock is unlocked then simply dropping the ref count on the glock will
cause the deallocation (dlm lock drop) to occur.

The reason why this tends to be seen at unlink time is just that
unlinking small files is a good way to iterate over a lot of inodes in a
relatively short time period, and thus locking can start to dominate the
timings if it isn't very low latency. I suspect there are lots of other
workloads which would see this (find perhaps?) as well. The faster the
array, the more noticeable the effect is likely to be.

Also, we do get some benefit if glocks are cached after inodes have been
unlinked. If an inode is then recreated in the same disk block, we'll
have all the glocks ready for it, without having to wait for them to be
set up from scratch.

I am slightly concerned that we ought to be testing with much greater
numbers of inodes for these kinds of tests. Considering the memory sizes
of modern servers, having 1m or even 10m files in cache is really not a
lot these days.

There is a second consideration in selecting the data structure for the
dlm lock/resource etc. tables, which is the locking. Using a simple hash
table does make it much easier to use RCU based locking which is a great
advantage if there are a lot of look up operations compared with the
number of insert/remove operations. That becomes a lot more difficult
(and requires a much greater lookup to insert/remove ratio) when trees
are involved.

We have run into the same kind of issues with GFS2's glock hash table.
We firstly reduced the size of the hash buckets to a single pointer so
that we could have a lot more hash heads for the given amount of memory.
Currently we have 2^16 hash table entries.

Also, we are now using RCU in order to reduce the locking overhead as
much as possible. We still have a spinlock for the lru list, but that is
not taken that much by comparison, and there is no locking in the lookup
path now except to disable preempt through the small critical section.

I'm not yet convinced that this is enough, even so. I would like to see
a tree based system for the glocks at some future point, but the
complexity of tree based RCU is currently putting me off, despite the
obvious benefits of a log N (where N is the number of items in the tree)
lookup compared with the N/2 average lookup time for a hash chain. We'd
need to add an seq lock on the read side which would take away some of
the benefit of using RCU in the first place.

However, where we have scored real performance gains is simply by
avoiding looking up the glocks at all. Hence why we cache them in the
inodes, and this is also why the void pointer we pass to the dlm is a
pointer to the glock, so there are then no hash table lookups when we
receive dlm replies. I think there is scope to explore this option in
the dlm in a few places too.

Our most recent expansion of this concept in gfs2 was the addition of a
variable in the inode to cache the last resource group used. This
reduces the number of lookups of the resource group data structure
dramatically during block allocations and deallocations.

Anyway, I just wanted to point out some of the tradeoffs of the possible
choices available in making gfs2/dlm scale to larger numbers of inodes.
I would very much like to have an RCU based rbtree (or similar) which
doesn't require the seq_lock if anybody has suggestions on how to
achieve that,

Steve.





^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-10 15:51         ` Steven Whitehouse
@ 2011-10-10 17:01           ` David Teigland
  2011-10-10 19:00             ` Steven Whitehouse
  0 siblings, 1 reply; 11+ messages in thread
From: David Teigland @ 2011-10-10 17:01 UTC (permalink / raw)
  To: cluster-devel.redhat.com

On Mon, Oct 10, 2011 at 04:51:01PM +0100, Steven Whitehouse wrote:
> Hi,
> 
> On Mon, 2011-10-10 at 10:43 -0400, David Teigland wrote:
> > On Sat, Oct 08, 2011 at 06:13:52AM -0400, Bob Peterson wrote:
> > > ----- Original Message -----
> > > | On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
> > > | > Hi,
> > > | > 
> > > | > This upstream patch changes the way DLM keeps track of RSBs.
> > > | > Before, they were in a linked list off a hash table.  Now,
> > > | > they're an rb_tree off the same hash table.  This speeds up
> > > | > DLM lookups greatly.
> > > | > 
> > > | > Today's DLM is faster than older DLMs for many file systems,
> > > | > (e.g. in RHEL5) due to the larger hash table size.  However,
> > > | > this rb_tree implementation scales much better.  For my
> > > | > 1000-directories-with-1000-files test, the patch doesn't
> > > | > show much of an improvement.  But when I scale the file system
> > > | > to 4000 directories with 4000 files (16 million files), it
> > > | > helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
> > > | > 42.01 hours to 23.68 hours.
> > > | 
> > > | How many hash table buckets were you using in that test?
> > > | If it was the default (1024), I'd be interested to know how
> > > | 16k compares.
> > > 
> > > Hi,
> > > 
> > > Interestingly, on the stock 2.6.32-206.el6.x86_64 kernel
> > > and 16K hash buckets, the time was virtually the same as
> > > with my patch: 1405m46.519s (23.43 hours). So perhaps we
> > > should re-evaluate whether we should use the rb_tree
> > > implementation or just increase the hash buckets as needed.
> > > I guess the question is now mainly related to scaling and
> > > memory usage for all those hash tables at this point.
> > 
> > I'm still interested in possibly using an rbtree with fewer hash buckets.
> > 
> > At the same time, I think the bigger problem may be why gfs2 is caching so
> > many locks in the first place, especially for millions of unlinked files
> > whose locks will never benefit you again.
> > 
> 
> I doubt that it is caching locks for unlinked files for any great period
> of time if there is any memory pressure. It is memory pressure which
> instigates the dropping of glocks relating to inodes usually. If the
> glock is unlocked then simply dropping the ref count on the glock will
> cause the deallocation (dlm lock drop) to occur.
>
> The reason why this tends to be seen at unlink time is just that
> unlinking small files is a good way to iterate over a lot of inodes in a
> relatively short time period, and thus locking can start to dominate the
> timings if it isn't very low latency. I suspect there are lots of other
> workloads which would see this (find perhaps?) as well. The faster the
> array, the more noticeable the effect is likely to be.
> 
> Also, we do get some benefit if glocks are cached after inodes have been
> unlinked. If an inode is then recreated in the same disk block, we'll
> have all the glocks ready for it, without having to wait for them to be
> set up from scratch.
> 
> I am slightly concerned that we ought to be testing with much greater
> numbers of inodes for these kinds of tests. Considering the memory sizes
> of modern servers, having 1m or even 10m files in cache is really not a
> lot these days.

The fact remains that caching "as much as possible" tends to be harmful,
and some careful limiting would be a good investment.

> There is a second consideration in selecting the data structure for the
> dlm lock/resource etc. tables, which is the locking. Using a simple hash
> table does make it much easier to use RCU based locking which is a great
> advantage if there are a lot of look up operations compared with the
> number of insert/remove operations. That becomes a lot more difficult
> (and requires a much greater lookup to insert/remove ratio) when trees
> are involved.
> 
> We have run into the same kind of issues with GFS2's glock hash table.
> We firstly reduced the size of the hash buckets to a single pointer so
> that we could have a lot more hash heads for the given amount of memory.
> Currently we have 2^16 hash table entries.
> 
> Also, we are now using RCU in order to reduce the locking overhead as
> much as possible. We still have a spinlock for the lru list, but that is
> not taken that much by comparison, and there is no locking in the lookup
> path now except to disable preempt through the small critical section.
> 
> I'm not yet convinced that this is enough, even so. I would like to see
> a tree based system for the glocks at some future point, but the
> complexity of tree based RCU is currently putting me off, despite the
> obvious benefits of a log N (where N is the number of items in the tree)
> lookup compared with the N/2 average lookup time for a hash chain. We'd
> need to add an seq lock on the read side which would take away some of
> the benefit of using RCU in the first place.
> 
> However, where we have scored real performance gains is simply by
> avoiding looking up the glocks at all. Hence why we cache them in the
> inodes, and this is also why the void pointer we pass to the dlm is a
> pointer to the glock, so there are then no hash table lookups when we
> receive dlm replies. I think there is scope to explore this option in
> the dlm in a few places too.
> 
> Our most recent expansion of this concept in gfs2 was the addition of a
> variable in the inode to cache the last resource group used. This
> reduces the number of lookups of the resource group data structure
> dramatically during block allocations and deallocations.
> 
> Anyway, I just wanted to point out some of the tradeoffs of the possible
> choices available in making gfs2/dlm scale to larger numbers of inodes.
> I would very much like to have an RCU based rbtree (or similar) which
> doesn't require the seq_lock if anybody has suggestions on how to
> achieve that,

While you don't want to be blatently inefficient, I suspect investing time
into things like RCU runs afoul of Amdahl's Law.  i.e. you could reduce
the time of the data structure operations to 0 and not affect real world
performance.

When deciding where to focus improvements, I think cache limting could
produce a big payoff, and data structure optimizations a small payoff.



^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-10 17:01           ` David Teigland
@ 2011-10-10 19:00             ` Steven Whitehouse
  2011-10-10 19:33               ` David Teigland
  0 siblings, 1 reply; 11+ messages in thread
From: Steven Whitehouse @ 2011-10-10 19:00 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi,

On Mon, 2011-10-10 at 13:01 -0400, David Teigland wrote:
> On Mon, Oct 10, 2011 at 04:51:01PM +0100, Steven Whitehouse wrote:
> > Hi,
> > 
> > On Mon, 2011-10-10 at 10:43 -0400, David Teigland wrote:
> > > On Sat, Oct 08, 2011 at 06:13:52AM -0400, Bob Peterson wrote:
> > > > ----- Original Message -----
> > > > | On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
> > > > | > Hi,
> > > > | > 
> > > > | > This upstream patch changes the way DLM keeps track of RSBs.
> > > > | > Before, they were in a linked list off a hash table.  Now,
> > > > | > they're an rb_tree off the same hash table.  This speeds up
> > > > | > DLM lookups greatly.
> > > > | > 
> > > > | > Today's DLM is faster than older DLMs for many file systems,
> > > > | > (e.g. in RHEL5) due to the larger hash table size.  However,
> > > > | > this rb_tree implementation scales much better.  For my
> > > > | > 1000-directories-with-1000-files test, the patch doesn't
> > > > | > show much of an improvement.  But when I scale the file system
> > > > | > to 4000 directories with 4000 files (16 million files), it
> > > > | > helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
> > > > | > 42.01 hours to 23.68 hours.
> > > > | 
> > > > | How many hash table buckets were you using in that test?
> > > > | If it was the default (1024), I'd be interested to know how
> > > > | 16k compares.
> > > > 
> > > > Hi,
> > > > 
> > > > Interestingly, on the stock 2.6.32-206.el6.x86_64 kernel
> > > > and 16K hash buckets, the time was virtually the same as
> > > > with my patch: 1405m46.519s (23.43 hours). So perhaps we
> > > > should re-evaluate whether we should use the rb_tree
> > > > implementation or just increase the hash buckets as needed.
> > > > I guess the question is now mainly related to scaling and
> > > > memory usage for all those hash tables at this point.
> > > 
> > > I'm still interested in possibly using an rbtree with fewer hash buckets.
> > > 
> > > At the same time, I think the bigger problem may be why gfs2 is caching so
> > > many locks in the first place, especially for millions of unlinked files
> > > whose locks will never benefit you again.
> > > 
> > 
> > I doubt that it is caching locks for unlinked files for any great period
> > of time if there is any memory pressure. It is memory pressure which
> > instigates the dropping of glocks relating to inodes usually. If the
> > glock is unlocked then simply dropping the ref count on the glock will
> > cause the deallocation (dlm lock drop) to occur.
> >
> > The reason why this tends to be seen at unlink time is just that
> > unlinking small files is a good way to iterate over a lot of inodes in a
> > relatively short time period, and thus locking can start to dominate the
> > timings if it isn't very low latency. I suspect there are lots of other
> > workloads which would see this (find perhaps?) as well. The faster the
> > array, the more noticeable the effect is likely to be.
> > 
> > Also, we do get some benefit if glocks are cached after inodes have been
> > unlinked. If an inode is then recreated in the same disk block, we'll
> > have all the glocks ready for it, without having to wait for them to be
> > set up from scratch.
> > 
> > I am slightly concerned that we ought to be testing with much greater
> > numbers of inodes for these kinds of tests. Considering the memory sizes
> > of modern servers, having 1m or even 10m files in cache is really not a
> > lot these days.
> 
> The fact remains that caching "as much as possible" tends to be harmful,
> and some careful limiting would be a good investment.
> 
There is a limit. The point is that the limit is dynamic and depends on
memory pressure. The VM requests a reduction in the number of cached
objects when it wants to reduce the size of what we have cached. So the
larger the amount of RAM, the more inodes may be potentially cached.

I don't agree that caching as much as possible (given the constraints
just mentioned) is bad. The more we cache, the more disk I/O is avoided
so the better the performance will be.

> > There is a second consideration in selecting the data structure for the
> > dlm lock/resource etc. tables, which is the locking. Using a simple hash
> > table does make it much easier to use RCU based locking which is a great
> > advantage if there are a lot of look up operations compared with the
> > number of insert/remove operations. That becomes a lot more difficult
> > (and requires a much greater lookup to insert/remove ratio) when trees
> > are involved.
> > 
> > We have run into the same kind of issues with GFS2's glock hash table.
> > We firstly reduced the size of the hash buckets to a single pointer so
> > that we could have a lot more hash heads for the given amount of memory.
> > Currently we have 2^16 hash table entries.
> > 
> > Also, we are now using RCU in order to reduce the locking overhead as
> > much as possible. We still have a spinlock for the lru list, but that is
> > not taken that much by comparison, and there is no locking in the lookup
> > path now except to disable preempt through the small critical section.
> > 
> > I'm not yet convinced that this is enough, even so. I would like to see
> > a tree based system for the glocks at some future point, but the
> > complexity of tree based RCU is currently putting me off, despite the
> > obvious benefits of a log N (where N is the number of items in the tree)
> > lookup compared with the N/2 average lookup time for a hash chain. We'd
> > need to add an seq lock on the read side which would take away some of
> > the benefit of using RCU in the first place.
> > 
> > However, where we have scored real performance gains is simply by
> > avoiding looking up the glocks at all. Hence why we cache them in the
> > inodes, and this is also why the void pointer we pass to the dlm is a
> > pointer to the glock, so there are then no hash table lookups when we
> > receive dlm replies. I think there is scope to explore this option in
> > the dlm in a few places too.
> > 
> > Our most recent expansion of this concept in gfs2 was the addition of a
> > variable in the inode to cache the last resource group used. This
> > reduces the number of lookups of the resource group data structure
> > dramatically during block allocations and deallocations.
> > 
> > Anyway, I just wanted to point out some of the tradeoffs of the possible
> > choices available in making gfs2/dlm scale to larger numbers of inodes.
> > I would very much like to have an RCU based rbtree (or similar) which
> > doesn't require the seq_lock if anybody has suggestions on how to
> > achieve that,
> 
> While you don't want to be blatently inefficient, I suspect investing time
> into things like RCU runs afoul of Amdahl's Law.  i.e. you could reduce
> the time of the data structure operations to 0 and not affect real world
> performance.
> 
> When deciding where to focus improvements, I think cache limting could
> produce a big payoff, and data structure optimizations a small payoff.
> 

We already have a limit (as per comments above) and I'm not sure whether
you are proposing any further constraints here. The fact remains though
that we ought to be able to make maximum use of whatever RAM is
available on the server, and it makes no sense to artificially limit the
amount of RAM used by glocks/dlm locks (i.e. number of objects which are
cached) beyond that point.

As a consequence we need to ensure that locking operations do not slow
down too much as the number of locks increases. I had thought that we
both were agreed on that point, at least, even if not what to do about
it.

The point of RCU is to avoid the cache pollution associated with
reader/writer locks, and that can be a big win, depending on the access
pattern to the data structure. So it does depend a great deal on that
pattern as to what pay off you get, but it was certainly worth doing for
the glocks.

One side effect of this is that nothing can ever block a glock dump now.
That can potentially be useful for debugging, although it was not the
main reason for doing it.

I would like to use RCU for resource groups too - they are "read only"
from mount time to umount time (aside from if we grow the filesystem)
but the tree structure we have for resource groups rather puts me off.
Most of the performance gain we've got so far has been down to the
caching of the resource groups in the inode, and there may be little
left to gain from RCU. That said, we'll be testing it soon I hope, and
we'll see if is worth doing or not,

Steve.




^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-10 19:00             ` Steven Whitehouse
@ 2011-10-10 19:33               ` David Teigland
  0 siblings, 0 replies; 11+ messages in thread
From: David Teigland @ 2011-10-10 19:33 UTC (permalink / raw)
  To: cluster-devel.redhat.com

On Mon, Oct 10, 2011 at 08:00:07PM +0100, Steven Whitehouse wrote:
> > The fact remains that caching "as much as possible" tends to be harmful,
> > and some careful limiting would be a good investment.
> > 
> There is a limit. The point is that the limit is dynamic and depends on
> memory pressure. 

the "as possible" part of "as much as possible".

> The VM requests a reduction in the number of cached
> objects when it wants to reduce the size of what we have cached. So the
> larger the amount of RAM, the more inodes may be potentially cached.
> 
> I don't agree that caching as much as possible (given the constraints
> just mentioned) is bad. 

the "tends to" part meant that it can be good or bad, depending.

> The more we cache, the more disk I/O is avoided so the better the
> performance will be.

You don't need to argue that more caching can be good.  My point is it's
not universally true in practice, as Bob's tests show.



^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-10 14:43       ` David Teigland
  2011-10-10 15:51         ` Steven Whitehouse
@ 2011-10-24 19:47         ` Bob Peterson
  2011-10-25 23:13           ` David Teigland
  1 sibling, 1 reply; 11+ messages in thread
From: Bob Peterson @ 2011-10-24 19:47 UTC (permalink / raw)
  To: cluster-devel.redhat.com

----- Original Message -----
| On Sat, Oct 08, 2011 at 06:13:52AM -0400, Bob Peterson wrote:
| > ----- Original Message -----
| > | On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
| > | > Hi,
| > | > 
| > | > This upstream patch changes the way DLM keeps track of RSBs.
| > | > Before, they were in a linked list off a hash table.  Now,
| > | > they're an rb_tree off the same hash table.  This speeds up
| > | > DLM lookups greatly.
| > | > 
| > | > Today's DLM is faster than older DLMs for many file systems,
| > | > (e.g. in RHEL5) due to the larger hash table size.  However,
| > | > this rb_tree implementation scales much better.  For my
| > | > 1000-directories-with-1000-files test, the patch doesn't
| > | > show much of an improvement.  But when I scale the file system
| > | > to 4000 directories with 4000 files (16 million files), it
| > | > helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
| > | > 42.01 hours to 23.68 hours.
| > | 
| > | How many hash table buckets were you using in that test?
| > | If it was the default (1024), I'd be interested to know how
| > | 16k compares.
| > 
| > Hi,
| > 
| > Interestingly, on the stock 2.6.32-206.el6.x86_64 kernel
| > and 16K hash buckets, the time was virtually the same as
| > with my patch: 1405m46.519s (23.43 hours). So perhaps we
| > should re-evaluate whether we should use the rb_tree
| > implementation or just increase the hash buckets as needed.
| > I guess the question is now mainly related to scaling and
| > memory usage for all those hash tables at this point.
| 
| I'm still interested in possibly using an rbtree with fewer hash
| buckets.
| 
| At the same time, I think the bigger problem may be why gfs2 is
| caching so
| many locks in the first place, especially for millions of unlinked
| files
| whose locks will never benefit you again.

Hi Dave,

Thanks for all your help and guidance.  Here is my latest
upstream patch for keeping RSBs in a hash table of rb_trees.
It hopefully incorporates all the changes we talked about.
Feel free to change as you see fit.

Regards,

Bob Peterson
Red Hat File Systems
--
commit 3f6dc3a83c27d5f915ebea0f53a5834db0d8b825
Author: Bob Peterson <rpeterso@redhat.com>
Date:   Wed Oct 5 10:04:39 2011 -0500

DLM: Convert rsb data from linked list to rb_tree

This patch changes the way DLM keeps track of rsbs.  Before, they
were in a linked list off a hash table.  Now, they're an rb_tree
off the same hash table.  This speeds up DLM lookups greatly.

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 5977923..0b035d7 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -393,9 +393,10 @@ static const struct seq_operations format3_seq_ops;
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+	struct rb_node *node = NULL;
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r = NULL;
 	loff_t n = *pos;
 	unsigned bucket, entry;
 
@@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		ri->format = 3;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-		list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
-				    res_hashchain) {
+	if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+		for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
+		     node = rb_next(&r->res_hashnode)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -449,9 +451,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -467,7 +468,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri = iter_ptr;
-	struct list_head *next;
+	struct rb_node *next;
 	struct dlm_rsb *r, *rp;
 	loff_t n = *pos;
 	unsigned bucket;
@@ -480,10 +481,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
-	next = rp->res_hashchain.next;
+	next = rb_next(&rp->res_hashnode);
 
-	if (next != &ls->ls_rsbtbl[bucket].list) {
-		r = list_entry(next, struct dlm_rsb, res_hashchain);
+	if (next) {
+		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +512,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index fe2860c..5685a9a 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -103,8 +103,8 @@ struct dlm_dirtable {
 };
 
 struct dlm_rsbtable {
-	struct list_head	list;
-	struct list_head	toss;
+	struct rb_root		keep;
+	struct rb_root		toss;
 	spinlock_t		lock;
 };
 
@@ -285,7 +285,10 @@ struct dlm_rsb {
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
-	struct list_head	res_hashchain;	/* rsbtbl */
+	union {
+		struct list_head	res_hashchain;
+		struct rb_node		res_hashnode;	/* rsbtbl */
+	};
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 83b5e32..05afb15 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/rbtree.h>
 #include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
@@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 
 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 	list_del(&r->res_hashchain);
+	/* Convert the empty list_head to a NULL rb_node for tree usage: */
+	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
 	spin_unlock(&ls->ls_new_rsb_spin);
 
@@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	memcpy(r->res_name, name, len);
 	mutex_init(&r->res_mutex);
 
-	INIT_LIST_HEAD(&r->res_hashchain);
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
 	INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,14 +402,38 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	return 0;
 }
 
-static int search_rsb_list(struct list_head *head, char *name, int len,
+/*
+ * rsb_cmp - compare an rsb against a name and length
+ * Returns:
+ *      0 - if the name and length match exactly
+ *     -1 - if the name is "less than" the rsb name
+ *      1 - if the name is "greater than" the rsb name
+ */
+static inline int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+{
+	char maxname[DLM_RESNAME_MAXLEN];
+
+	memset(maxname, 0, DLM_RESNAME_MAXLEN);
+	memcpy(maxname, name, nlen);
+	return memcmp(maxname, r->res_name, DLM_RESNAME_MAXLEN);
+}
+
+static int search_rsb_tree(struct rb_root *tree, char *name, int len,
 			   unsigned int flags, struct dlm_rsb **r_ret)
 {
-	struct dlm_rsb *r;
+	struct rb_node **node = &tree->rb_node;
+	struct dlm_rsb *r = NULL;
 	int error = 0;
-
-	list_for_each_entry(r, head, res_hashchain) {
-		if (len == r->res_length && !memcmp(name, r->res_name, len))
+	int rc;
+
+	while (*node) {
+		r = rb_entry(*node, struct dlm_rsb, res_hashnode);
+		rc = rsb_cmp(r, name, len);
+		if (rc < 0)
+			node = &((*node)->rb_left);
+		else if (rc > 0)
+			node = &((*node)->rb_right);
+		else
 			goto found;
 	}
 	*r_ret = NULL;
@@ -420,22 +446,48 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
 	return error;
 }
 
+static void rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+{
+	struct rb_node **newn = &tree->rb_node, *parent = NULL;
+	int rc;
+	const char *name = rsb->res_name; /* This is just for clarity */
+
+	/* Figure out where to put new node */
+	while (*newn) {
+		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
+					       res_hashnode);
+
+		parent = *newn;
+		rc = rsb_cmp(cur, name, rsb->res_length);
+		if (rc < 0)
+			newn = &((*newn)->rb_left);
+		else if (rc > 0)
+			newn = &((*newn)->rb_right);
+		else
+			return;
+	}
+
+	rb_link_node(&rsb->res_hashnode, parent, newn);
+	rb_insert_color(&rsb->res_hashnode, tree);
+}
+
 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 		       unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r;
 	int error;
 
-	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
 	if (!error) {
 		kref_get(&r->res_ref);
 		goto out;
 	}
-	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 	if (error)
 		goto out;
 
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 
 	if (dlm_no_directory(ls))
 		goto out;
@@ -527,7 +579,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 			nodeid = 0;
 		r->res_nodeid = nodeid;
 	}
-	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
+	rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
 	error = 0;
  out_unlock:
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -556,7 +608,8 @@ static void toss_rsb(struct kref *kref)
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
+	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
 	r->res_toss_time = jiffies;
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
@@ -1087,14 +1140,20 @@ static void dir_remove(struct dlm_rsb *r)
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int count = 0, found;
 
 	for (;;) {
 		found = 0;
 		spin_lock(&ls->ls_rsbtbl[b].lock);
-		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
-					    res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n;
+		     n = rb_next(&r->res_hashnode)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (unlikely(&r->res_hashnode == n)) {
+				spin_unlock(&ls->ls_rsbtbl[b].lock);
+				return 0;
+			}
 			if (!time_after_eq(jiffies, r->res_toss_time +
 					   dlm_config.ci_toss_secs * HZ))
 				continue;
@@ -1108,7 +1167,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		if (kref_put(&r->res_ref, kill_rsb)) {
-			list_del(&r->res_hashchain);
+			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 			spin_unlock(&ls->ls_rsbtbl[b].lock);
 
 			if (is_master(r))
@@ -4441,10 +4500,13 @@ int dlm_purge_locks(struct dlm_ls *ls)
 
 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r, *r_ret = NULL;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
+	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n;
+	     n = rb_next(&r->res_hashnode)) {
+		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
 			continue;
 		hold_rsb(r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1d8f1a..f58b4ac 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -457,8 +457,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
+		ls->ls_rsbtbl[i].keep.rb_node = NULL;
+		ls->ls_rsbtbl[i].toss.rb_node = NULL;
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
@@ -472,7 +472,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	if (!ls->ls_dirtbl)
 		goto out_lkbfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
+		ls->ls_dirtbl[i].root.rb_node = NULL;
 		spin_lock_init(&ls->ls_dirtbl[i].lock);
 	}
 
@@ -685,7 +685,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct list_head *head;
+	struct rb_node *n = NULL;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -746,26 +746,22 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		head = &ls->ls_rsbtbl[i].list;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].keep);
 			dlm_free_rsb(rsb);
 		}
 
-		head = &ls->ls_rsbtbl[i].toss;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
 			dlm_free_rsb(rsb);
 		}
 	}
 
 	vfree(ls->ls_rsbtbl);
 
+	/* Free up the new rsb list */
 	while (!list_empty(&ls->ls_new_rsb)) {
 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 				       res_hashchain);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index da64df7..81b4802 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -64,6 +64,7 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 
 	r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
+	INIT_LIST_HEAD(&r->res_hashchain);
 	return r;
 }
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 1463823..ff5f7b4 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -715,6 +715,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 int dlm_create_root_list(struct dlm_ls *ls)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int i, error = 0;
 
@@ -727,7 +728,9 @@ int dlm_create_root_list(struct dlm_ls *ls)
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n;
+		     n = rb_next(&r->res_hashnode)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -741,7 +744,9 @@ int dlm_create_root_list(struct dlm_ls *ls)
 			continue;
 		}
 
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n;
+		     n = rb_next(&r->res_hashnode)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -771,16 +776,19 @@ void dlm_release_root_list(struct dlm_ls *ls)
 
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
-	struct dlm_rsb *r, *safe;
+	struct rb_node *node, *next = NULL;
+	struct dlm_rsb *rsb;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
-					 res_hashchain) {
-			if (dlm_no_directory(ls) || !is_master(r)) {
-				list_del(&r->res_hashchain);
-				dlm_free_rsb(r);
+		for (node = rb_first(&ls->ls_rsbtbl[i].toss); node;
+		     node = next) {
+			rsb = rb_entry(node, struct dlm_rsb, res_hashnode);
+			next = rb_next(&rsb->res_hashnode);
+			if (dlm_no_directory(ls) || !is_master(rsb)) {
+				rb_erase(node, &ls->ls_rsbtbl[i].toss);
+				dlm_free_rsb(rsb);
 			}
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);



^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-24 19:47         ` Bob Peterson
@ 2011-10-25 23:13           ` David Teigland
  2011-10-26 17:28             ` Bob Peterson
  0 siblings, 1 reply; 11+ messages in thread
From: David Teigland @ 2011-10-25 23:13 UTC (permalink / raw)
  To: cluster-devel.redhat.com

Hi Bob, I've made a few minor/cosmetic changes and attached my current
version (not tested yet).

>  static int shrink_bucket(struct dlm_ls *ls, int b)
>  {
> +	struct rb_node *n = NULL;
>  	struct dlm_rsb *r;
>  	int count = 0, found;
>  
>  	for (;;) {
>  		found = 0;
>  		spin_lock(&ls->ls_rsbtbl[b].lock);
> -		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
> -					    res_hashchain) {
> +		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n;
> +		     n = rb_next(&r->res_hashnode)) {
> +			r = rb_entry(n, struct dlm_rsb, res_hashnode);
> +			if (unlikely(&r->res_hashnode == n)) {
> +				spin_unlock(&ls->ls_rsbtbl[b].lock);
> +				return 0;
> +			}

Isn't that unlikely statement actually always true?

>  			if (!time_after_eq(jiffies, r->res_toss_time +
>  					   dlm_config.ci_toss_secs * HZ))
>  				continue;
> @@ -1108,7 +1167,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
>  		}
>  
>  		if (kref_put(&r->res_ref, kill_rsb)) {
> -			list_del(&r->res_hashchain);
> +			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
>  			spin_unlock(&ls->ls_rsbtbl[b].lock);
>  
>  			if (is_master(r))


-------------- next part --------------
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 5977923..a834f6b 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -393,6 +393,7 @@ static const struct seq_operations format3_seq_ops;
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+	struct rb_node *node;
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri;
 	struct dlm_rsb *r;
@@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		ri->format = 3;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-		list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
-				    res_hashchain) {
+	if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+		for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
+		     node = rb_next(node)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -449,9 +451,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -467,7 +468,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri = iter_ptr;
-	struct list_head *next;
+	struct rb_node *next;
 	struct dlm_rsb *r, *rp;
 	loff_t n = *pos;
 	unsigned bucket;
@@ -480,10 +481,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
-	next = rp->res_hashchain.next;
+	next = rb_next(&rp->res_hashnode);
 
-	if (next != &ls->ls_rsbtbl[bucket].list) {
-		r = list_entry(next, struct dlm_rsb, res_hashchain);
+	if (next) {
+		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +512,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index fe2860c..5685a9a 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -103,8 +103,8 @@ struct dlm_dirtable {
 };
 
 struct dlm_rsbtable {
-	struct list_head	list;
-	struct list_head	toss;
+	struct rb_root		keep;
+	struct rb_root		toss;
 	spinlock_t		lock;
 };
 
@@ -285,7 +285,10 @@ struct dlm_rsb {
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
-	struct list_head	res_hashchain;	/* rsbtbl */
+	union {
+		struct list_head	res_hashchain;
+		struct rb_node		res_hashnode;	/* rsbtbl */
+	};
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 83b5e32..1373f62 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/rbtree.h>
 #include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
@@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 
 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 	list_del(&r->res_hashchain);
+	/* Convert the empty list_head to a NULL rb_node for tree usage: */
+	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
 	spin_unlock(&ls->ls_new_rsb_spin);
 
@@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	memcpy(r->res_name, name, len);
 	mutex_init(&r->res_mutex);
 
-	INIT_LIST_HEAD(&r->res_hashchain);
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
 	INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,14 +402,31 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	return 0;
 }
 
-static int search_rsb_list(struct list_head *head, char *name, int len,
+static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+{
+	char maxname[DLM_RESNAME_MAXLEN];
+
+	memset(maxname, 0, DLM_RESNAME_MAXLEN);
+	memcpy(maxname, name, nlen);
+	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
+}
+
+static int search_rsb_tree(struct rb_root *tree, char *name, int len,
 			   unsigned int flags, struct dlm_rsb **r_ret)
 {
+	struct rb_node **node = &tree->rb_node;
 	struct dlm_rsb *r;
 	int error = 0;
-
-	list_for_each_entry(r, head, res_hashchain) {
-		if (len == r->res_length && !memcmp(name, r->res_name, len))
+	int rc;
+
+	while (*node) {
+		r = rb_entry(*node, struct dlm_rsb, res_hashnode);
+		rc = rsb_cmp(r, name, len);
+		if (rc < 0)
+			node = &((*node)->rb_left);
+		else if (rc > 0)
+			node = &((*node)->rb_right);
+		else
 			goto found;
 	}
 	*r_ret = NULL;
@@ -420,22 +439,54 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
 	return error;
 }
 
+static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+{
+	struct rb_node **newn = &tree->rb_node;
+	struct rb_node *parent = NULL;
+	int rc;
+
+	while (*newn) {
+		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
+					       res_hashnode);
+
+		parent = *newn;
+		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
+		if (rc < 0)
+			newn = &parent->rb_left;
+		else if (rc > 0)
+			newn = &parent->rb_right;
+		else {
+			log_print("rsb_insert match");
+			dlm_dump_rsb(rsb);
+			dlm_dump_rsb(cur);
+			return -EEXIST;
+		}
+	}
+
+	rb_link_node(&rsb->res_hashnode, parent, newn);
+	rb_insert_color(&rsb->res_hashnode, tree);
+	return 0;
+}
+
 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 		       unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r;
 	int error;
 
-	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
 	if (!error) {
 		kref_get(&r->res_ref);
 		goto out;
 	}
-	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 	if (error)
 		goto out;
 
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	if (error)
+		return error;
 
 	if (dlm_no_directory(ls))
 		goto out;
@@ -527,8 +578,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 			nodeid = 0;
 		r->res_nodeid = nodeid;
 	}
-	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
-	error = 0;
+	error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
  out_unlock:
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
  out:
@@ -556,7 +606,8 @@ static void toss_rsb(struct kref *kref)
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
+	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
 	r->res_toss_time = jiffies;
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
@@ -1082,19 +1133,19 @@ static void dir_remove(struct dlm_rsb *r)
 				     r->res_name, r->res_length);
 }
 
-/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
-   found since they are in order of newest to oldest? */
+/* FIXME: make this more efficient */
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
+	struct rb_node *n;
 	struct dlm_rsb *r;
 	int count = 0, found;
 
 	for (;;) {
 		found = 0;
 		spin_lock(&ls->ls_rsbtbl[b].lock);
-		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
-					    res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (!time_after_eq(jiffies, r->res_toss_time +
 					   dlm_config.ci_toss_secs * HZ))
 				continue;
@@ -1108,7 +1159,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		if (kref_put(&r->res_ref, kill_rsb)) {
-			list_del(&r->res_hashchain);
+			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 			spin_unlock(&ls->ls_rsbtbl[b].lock);
 
 			if (is_master(r))
@@ -4441,10 +4492,12 @@ int dlm_purge_locks(struct dlm_ls *ls)
 
 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
+	struct rb_node *n;
 	struct dlm_rsb *r, *r_ret = NULL;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
+	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
+		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
 			continue;
 		hold_rsb(r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1d8f1a..3a1319c 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -457,8 +457,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
+		ls->ls_rsbtbl[i].keep.rb_node = NULL;
+		ls->ls_rsbtbl[i].toss.rb_node = NULL;
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
@@ -685,7 +685,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct list_head *head;
+	struct rb_node *n;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -746,26 +746,22 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		head = &ls->ls_rsbtbl[i].list;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].keep);
 			dlm_free_rsb(rsb);
 		}
 
-		head = &ls->ls_rsbtbl[i].toss;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
 			dlm_free_rsb(rsb);
 		}
 	}
 
 	vfree(ls->ls_rsbtbl);
 
+	/* Free up the new rsb list */
 	while (!list_empty(&ls->ls_new_rsb)) {
 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 				       res_hashchain);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index da64df7..81b4802 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -64,6 +64,7 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 
 	r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
+	INIT_LIST_HEAD(&r->res_hashchain);
 	return r;
 }
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 1463823..50467ce 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -715,6 +715,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 int dlm_create_root_list(struct dlm_ls *ls)
 {
+	struct rb_node *n;
 	struct dlm_rsb *r;
 	int i, error = 0;
 
@@ -727,7 +728,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -741,7 +743,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
 			continue;
 		}
 
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -771,16 +774,18 @@ void dlm_release_root_list(struct dlm_ls *ls)
 
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
-	struct dlm_rsb *r, *safe;
+	struct rb_node *n, *next;
+	struct dlm_rsb *rsb;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
-					 res_hashchain) {
-			if (dlm_no_directory(ls) || !is_master(r)) {
-				list_del(&r->res_hashchain);
-				dlm_free_rsb(r);
+		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
+			next = rb_next(n);;
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (dlm_no_directory(ls) || !is_master(rsb)) {
+				rb_erase(n, &ls->ls_rsbtbl[i].toss);
+				dlm_free_rsb(rsb);
 			}
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree
  2011-10-25 23:13           ` David Teigland
@ 2011-10-26 17:28             ` Bob Peterson
  0 siblings, 0 replies; 11+ messages in thread
From: Bob Peterson @ 2011-10-26 17:28 UTC (permalink / raw)
  To: cluster-devel.redhat.com

----- Original Message -----
| Hi Bob, I've made a few minor/cosmetic changes and attached my
| current
| version (not tested yet).
(snip)
| > +			if (unlikely(&r->res_hashnode == n)) {
| > +				spin_unlock(&ls->ls_rsbtbl[b].lock);
| > +				return 0;
| > +			}
| 
| Isn't that unlikely statement actually always true?
| 

Hi Dave,

You're right. I think that section should be removed.
What I coded was clearly not what I intended.  I originally
intended it to be a safeguard against a case where the rb_tree
was empty but we still had a node pointer.

Regards,

Bob Peterson
Red Hat File Systems



^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2011-10-26 17:28 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
     [not found] <9ba880ab-984b-4588-b2cb-04089b0943ee@zmail06.collab.prod.int.phx2.redhat.com>
2011-10-05 19:25 ` [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree Bob Peterson
2011-10-05 20:05   ` David Teigland
2011-10-08 10:13     ` Bob Peterson
2011-10-10 14:43       ` David Teigland
2011-10-10 15:51         ` Steven Whitehouse
2011-10-10 17:01           ` David Teigland
2011-10-10 19:00             ` Steven Whitehouse
2011-10-10 19:33               ` David Teigland
2011-10-24 19:47         ` Bob Peterson
2011-10-25 23:13           ` David Teigland
2011-10-26 17:28             ` Bob Peterson

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).