[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree



----- Original Message -----
| On Sat, Oct 08, 2011 at 06:13:52AM -0400, Bob Peterson wrote:
| > ----- Original Message -----
| > | On Wed, Oct 05, 2011 at 03:25:39PM -0400, Bob Peterson wrote:
| > | > Hi,
| > | > 
| > | > This upstream patch changes the way DLM keeps track of RSBs.
| > | > Before, they were in a linked list off a hash table.  Now,
| > | > they're an rb_tree off the same hash table.  This speeds up
| > | > DLM lookups greatly.
| > | > 
| > | > Today's DLM is faster than older DLMs for many file systems,
| > | > (e.g. in RHEL5) due to the larger hash table size.  However,
| > | > this rb_tree implementation scales much better.  For my
| > | > 1000-directories-with-1000-files test, the patch doesn't
| > | > show much of an improvement.  But when I scale the file system
| > | > to 4000 directories with 4000 files (16 million files), it
| > | > helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
| > | > 42.01 hours to 23.68 hours.
| > | 
| > | How many hash table buckets were you using in that test?
| > | If it was the default (1024), I'd be interested to know how
| > | 16k compares.
| > 
| > Hi,
| > 
| > Interestingly, on the stock 2.6.32-206.el6.x86_64 kernel
| > and 16K hash buckets, the time was virtually the same as
| > with my patch: 1405m46.519s (23.43 hours). So perhaps we
| > should re-evaluate whether we should use the rb_tree
| > implementation or just increase the hash buckets as needed.
| > I guess the question is now mainly related to scaling and
| > memory usage for all those hash tables at this point.
| 
| I'm still interested in possibly using an rbtree with fewer hash
| buckets.
| 
| At the same time, I think the bigger problem may be why gfs2 is
| caching so
| many locks in the first place, especially for millions of unlinked
| files
| whose locks will never benefit you again.

Hi Dave,

Thanks for all your help and guidance.  Here is my latest
upstream patch for keeping RSBs in a hash table of rb_trees.
It hopefully incorporates all the changes we talked about.
Feel free to change as you see fit.

Regards,

Bob Peterson
Red Hat File Systems
--
commit 3f6dc3a83c27d5f915ebea0f53a5834db0d8b825
Author: Bob Peterson <rpeterso redhat com>
Date:   Wed Oct 5 10:04:39 2011 -0500

DLM: Convert rsb data from linked list to rb_tree

This patch changes the way DLM keeps track of rsbs.  Before, they
were in a linked list off a hash table.  Now, they're an rb_tree
off the same hash table.  This speeds up DLM lookups greatly.

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 5977923..0b035d7 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -393,9 +393,10 @@ static const struct seq_operations format3_seq_ops;
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+	struct rb_node *node = NULL;
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r = NULL;
 	loff_t n = *pos;
 	unsigned bucket, entry;
 
@@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		ri->format = 3;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-		list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
-				    res_hashchain) {
+	if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+		for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
+		     node = rb_next(&r->res_hashnode)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -449,9 +451,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -467,7 +468,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri = iter_ptr;
-	struct list_head *next;
+	struct rb_node *next;
 	struct dlm_rsb *r, *rp;
 	loff_t n = *pos;
 	unsigned bucket;
@@ -480,10 +481,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
-	next = rp->res_hashchain.next;
+	next = rb_next(&rp->res_hashnode);
 
-	if (next != &ls->ls_rsbtbl[bucket].list) {
-		r = list_entry(next, struct dlm_rsb, res_hashchain);
+	if (next) {
+		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +512,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index fe2860c..5685a9a 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -103,8 +103,8 @@ struct dlm_dirtable {
 };
 
 struct dlm_rsbtable {
-	struct list_head	list;
-	struct list_head	toss;
+	struct rb_root		keep;
+	struct rb_root		toss;
 	spinlock_t		lock;
 };
 
@@ -285,7 +285,10 @@ struct dlm_rsb {
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
-	struct list_head	res_hashchain;	/* rsbtbl */
+	union {
+		struct list_head	res_hashchain;
+		struct rb_node		res_hashnode;	/* rsbtbl */
+	};
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 83b5e32..05afb15 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/rbtree.h>
 #include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
@@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 
 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 	list_del(&r->res_hashchain);
+	/* Convert the empty list_head to a NULL rb_node for tree usage: */
+	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
 	spin_unlock(&ls->ls_new_rsb_spin);
 
@@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	memcpy(r->res_name, name, len);
 	mutex_init(&r->res_mutex);
 
-	INIT_LIST_HEAD(&r->res_hashchain);
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
 	INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,14 +402,38 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	return 0;
 }
 
-static int search_rsb_list(struct list_head *head, char *name, int len,
+/*
+ * rsb_cmp - compare an rsb against a name and length
+ * Returns:
+ *      0 - if the name and length match exactly
+ *     -1 - if the name is "less than" the rsb name
+ *      1 - if the name is "greater than" the rsb name
+ */
+static inline int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+{
+	char maxname[DLM_RESNAME_MAXLEN];
+
+	memset(maxname, 0, DLM_RESNAME_MAXLEN);
+	memcpy(maxname, name, nlen);
+	return memcmp(maxname, r->res_name, DLM_RESNAME_MAXLEN);
+}
+
+static int search_rsb_tree(struct rb_root *tree, char *name, int len,
 			   unsigned int flags, struct dlm_rsb **r_ret)
 {
-	struct dlm_rsb *r;
+	struct rb_node **node = &tree->rb_node;
+	struct dlm_rsb *r = NULL;
 	int error = 0;
-
-	list_for_each_entry(r, head, res_hashchain) {
-		if (len == r->res_length && !memcmp(name, r->res_name, len))
+	int rc;
+
+	while (*node) {
+		r = rb_entry(*node, struct dlm_rsb, res_hashnode);
+		rc = rsb_cmp(r, name, len);
+		if (rc < 0)
+			node = &((*node)->rb_left);
+		else if (rc > 0)
+			node = &((*node)->rb_right);
+		else
 			goto found;
 	}
 	*r_ret = NULL;
@@ -420,22 +446,48 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
 	return error;
 }
 
+static void rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+{
+	struct rb_node **newn = &tree->rb_node, *parent = NULL;
+	int rc;
+	const char *name = rsb->res_name; /* This is just for clarity */
+
+	/* Figure out where to put new node */
+	while (*newn) {
+		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
+					       res_hashnode);
+
+		parent = *newn;
+		rc = rsb_cmp(cur, name, rsb->res_length);
+		if (rc < 0)
+			newn = &((*newn)->rb_left);
+		else if (rc > 0)
+			newn = &((*newn)->rb_right);
+		else
+			return;
+	}
+
+	rb_link_node(&rsb->res_hashnode, parent, newn);
+	rb_insert_color(&rsb->res_hashnode, tree);
+}
+
 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 		       unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r;
 	int error;
 
-	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
 	if (!error) {
 		kref_get(&r->res_ref);
 		goto out;
 	}
-	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 	if (error)
 		goto out;
 
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 
 	if (dlm_no_directory(ls))
 		goto out;
@@ -527,7 +579,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 			nodeid = 0;
 		r->res_nodeid = nodeid;
 	}
-	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
+	rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
 	error = 0;
  out_unlock:
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -556,7 +608,8 @@ static void toss_rsb(struct kref *kref)
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
+	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
 	r->res_toss_time = jiffies;
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
@@ -1087,14 +1140,20 @@ static void dir_remove(struct dlm_rsb *r)
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int count = 0, found;
 
 	for (;;) {
 		found = 0;
 		spin_lock(&ls->ls_rsbtbl[b].lock);
-		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
-					    res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n;
+		     n = rb_next(&r->res_hashnode)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (unlikely(&r->res_hashnode == n)) {
+				spin_unlock(&ls->ls_rsbtbl[b].lock);
+				return 0;
+			}
 			if (!time_after_eq(jiffies, r->res_toss_time +
 					   dlm_config.ci_toss_secs * HZ))
 				continue;
@@ -1108,7 +1167,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		if (kref_put(&r->res_ref, kill_rsb)) {
-			list_del(&r->res_hashchain);
+			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 			spin_unlock(&ls->ls_rsbtbl[b].lock);
 
 			if (is_master(r))
@@ -4441,10 +4500,13 @@ int dlm_purge_locks(struct dlm_ls *ls)
 
 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r, *r_ret = NULL;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
+	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n;
+	     n = rb_next(&r->res_hashnode)) {
+		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
 			continue;
 		hold_rsb(r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1d8f1a..f58b4ac 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -457,8 +457,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
+		ls->ls_rsbtbl[i].keep.rb_node = NULL;
+		ls->ls_rsbtbl[i].toss.rb_node = NULL;
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
@@ -472,7 +472,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	if (!ls->ls_dirtbl)
 		goto out_lkbfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
+		ls->ls_dirtbl[i].root.rb_node = NULL;
 		spin_lock_init(&ls->ls_dirtbl[i].lock);
 	}
 
@@ -685,7 +685,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct list_head *head;
+	struct rb_node *n = NULL;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -746,26 +746,22 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		head = &ls->ls_rsbtbl[i].list;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].keep);
 			dlm_free_rsb(rsb);
 		}
 
-		head = &ls->ls_rsbtbl[i].toss;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
 			dlm_free_rsb(rsb);
 		}
 	}
 
 	vfree(ls->ls_rsbtbl);
 
+	/* Free up the new rsb list */
 	while (!list_empty(&ls->ls_new_rsb)) {
 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 				       res_hashchain);
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index da64df7..81b4802 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -64,6 +64,7 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 	struct dlm_rsb *r;
 
 	r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
+	INIT_LIST_HEAD(&r->res_hashchain);
 	return r;
 }
 
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 1463823..ff5f7b4 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -715,6 +715,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 int dlm_create_root_list(struct dlm_ls *ls)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int i, error = 0;
 
@@ -727,7 +728,9 @@ int dlm_create_root_list(struct dlm_ls *ls)
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n;
+		     n = rb_next(&r->res_hashnode)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -741,7 +744,9 @@ int dlm_create_root_list(struct dlm_ls *ls)
 			continue;
 		}
 
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n;
+		     n = rb_next(&r->res_hashnode)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -771,16 +776,19 @@ void dlm_release_root_list(struct dlm_ls *ls)
 
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
-	struct dlm_rsb *r, *safe;
+	struct rb_node *node, *next = NULL;
+	struct dlm_rsb *rsb;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
-					 res_hashchain) {
-			if (dlm_no_directory(ls) || !is_master(r)) {
-				list_del(&r->res_hashchain);
-				dlm_free_rsb(r);
+		for (node = rb_first(&ls->ls_rsbtbl[i].toss); node;
+		     node = next) {
+			rsb = rb_entry(node, struct dlm_rsb, res_hashnode);
+			next = rb_next(&rsb->res_hashnode);
+			if (dlm_no_directory(ls) || !is_master(rsb)) {
+				rb_erase(node, &ls->ls_rsbtbl[i].toss);
+				dlm_free_rsb(rsb);
 			}
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]