[Cluster-devel] [Upstream patch] DLM: Convert rsb data from linked list to rb_tree

Bob Peterson rpeterso at redhat.com
Wed Oct 5 19:25:39 UTC 2011


Hi,

This upstream patch changes the way DLM keeps track of RSBs.
Before, they were in a linked list off a hash table.  Now,
they're an rb_tree off the same hash table.  This speeds up
DLM lookups greatly.

Today's DLM is faster than older DLMs for many file systems,
(e.g. in RHEL5) due to the larger hash table size.  However,
this rb_tree implementation scales much better.  For my
1000-directories-with-1000-files test, the patch doesn't
show much of an improvement.  But when I scale the file system
to 4000 directories with 4000 files (16 million files), it
helps greatly. The time to do rm -fR /mnt/gfs2/* drops from
42.01 hours to 23.68 hours.

With this patch I believe we could also reduce the size of
the hash table again or eliminate it completely, but we can
evaluate and do that later.

NOTE: Today's upstream DLM code has special code to
pre-allocate RSB structures for faster lookup.  This patch
eliminates that step, since it doesn't have a resource name
at that time for inserting new entries in the rb_tree.

Regards,

Bob Peterson
Red Hat File Systems

Signed-off-by: Bob Peterson <rpeterso at redhat.com> 
--
commit 4f65e6021b3b5582c731542552914a3f6865f8a5
Author: Bob Peterson <rpeterso at redhat.com>
Date:   Wed Oct 5 10:04:39 2011 -0500

    DLM: Convert rsb data from linked list to rb_tree
    
    This patch changes the way DLM keeps track of rsbs.  Before, they
    were in a linked list off a hash table.  Now, they're an rb_tree
    off the same hash table.  This speeds up DLM lookups greatly.

diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 5977923..f2f4150 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -393,9 +393,10 @@ static const struct seq_operations format3_seq_ops;
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+	struct rb_node *node = NULL;
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri;
-	struct dlm_rsb *r;
+	struct dlm_rsb *r = NULL;
 	loff_t n = *pos;
 	unsigned bucket, entry;
 
@@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		ri->format = 3;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-		list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
-				    res_hashchain) {
+	if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+		node = rb_first(&ls->ls_rsbtbl[bucket].keep);
+		while (node) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -428,6 +430,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 				spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 				return ri;
 			}
+			node = rb_next(&r->res_hashnode);
 		}
 	}
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -449,9 +452,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -467,7 +469,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri = iter_ptr;
-	struct list_head *next;
+	struct rb_node *next;
 	struct dlm_rsb *r, *rp;
 	loff_t n = *pos;
 	unsigned bucket;
@@ -480,10 +482,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
-	next = rp->res_hashchain.next;
+	next = rb_next(&rp->res_hashnode);
 
-	if (next != &ls->ls_rsbtbl[bucket].list) {
-		r = list_entry(next, struct dlm_rsb, res_hashchain);
+	if (next) {
+		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +513,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index fe2860c..fd7da74 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -103,8 +103,8 @@ struct dlm_dirtable {
 };
 
 struct dlm_rsbtable {
-	struct list_head	list;
-	struct list_head	toss;
+	struct rb_root		keep;
+	struct rb_root		toss;
 	spinlock_t		lock;
 };
 
@@ -285,7 +285,7 @@ struct dlm_rsb {
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
-	struct list_head	res_hashchain;	/* rsbtbl */
+	struct rb_node		res_hashnode;	/* rsbtbl */
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
@@ -479,10 +479,6 @@ struct dlm_ls {
 	struct mutex		ls_timeout_mutex;
 	struct list_head	ls_timeout;
 
-	spinlock_t		ls_new_rsb_spin;
-	int			ls_new_rsb_count;
-	struct list_head	ls_new_rsb;	/* new rsb structs */
-
 	struct list_head	ls_nodes;	/* current nodes in ls */
 	struct list_head	ls_nodes_gone;	/* dead node list, recovery */
 	int			ls_num_nodes;	/* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 83b5e32..79789d1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/rbtree.h>
 #include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
@@ -327,38 +328,6 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
  * Basic operations on rsb's and lkb's
  */
 
-static int pre_rsb_struct(struct dlm_ls *ls)
-{
-	struct dlm_rsb *r1, *r2;
-	int count = 0;
-
-	spin_lock(&ls->ls_new_rsb_spin);
-	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
-		spin_unlock(&ls->ls_new_rsb_spin);
-		return 0;
-	}
-	spin_unlock(&ls->ls_new_rsb_spin);
-
-	r1 = dlm_allocate_rsb(ls);
-	r2 = dlm_allocate_rsb(ls);
-
-	spin_lock(&ls->ls_new_rsb_spin);
-	if (r1) {
-		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
-		ls->ls_new_rsb_count++;
-	}
-	if (r2) {
-		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
-		ls->ls_new_rsb_count++;
-	}
-	count = ls->ls_new_rsb_count;
-	spin_unlock(&ls->ls_new_rsb_spin);
-
-	if (!count)
-		return -ENOMEM;
-	return 0;
-}
-
 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
    unlock any spinlocks, go back and call pre_rsb_struct again.
    Otherwise, take an rsb off the list and return it. */
@@ -367,28 +336,16 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 			  struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r;
-	int count;
 
-	spin_lock(&ls->ls_new_rsb_spin);
-	if (list_empty(&ls->ls_new_rsb)) {
-		count = ls->ls_new_rsb_count;
-		spin_unlock(&ls->ls_new_rsb_spin);
-		log_debug(ls, "find_rsb retry %d %d %s",
-			  count, dlm_config.ci_new_rsb_count, name);
-		return -EAGAIN;
-	}
-
-	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
-	list_del(&r->res_hashchain);
-	ls->ls_new_rsb_count--;
-	spin_unlock(&ls->ls_new_rsb_spin);
+	r = dlm_allocate_rsb(ls);
+	if (!r)
+		return -ENOMEM;
 
 	r->res_ls = ls;
 	r->res_length = len;
 	memcpy(r->res_name, name, len);
 	mutex_init(&r->res_mutex);
 
-	INIT_LIST_HEAD(&r->res_hashchain);
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
 	INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,42 +357,76 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 	return 0;
 }
 
-static int search_rsb_list(struct list_head *head, char *name, int len,
+static int search_rsb_tree(struct rb_root *tree, char *name, int len,
 			   unsigned int flags, struct dlm_rsb **r_ret)
 {
-	struct dlm_rsb *r;
-	int error = 0;
-
-	list_for_each_entry(r, head, res_hashchain) {
-		if (len == r->res_length && !memcmp(name, r->res_name, len))
+	struct rb_node **node = &tree->rb_node;
+	struct dlm_rsb *rsb = NULL;
+	int rc;
+
+	while (*node) {
+		rsb = rb_entry(*node, struct dlm_rsb, res_hashnode);
+		rc = memcmp(name, rsb->res_name, len);
+		if (rc < 0)
+			node = &((*node)->rb_left);
+		else if (rc > 0)
+			node = &((*node)->rb_right);
+		else
 			goto found;
 	}
 	*r_ret = NULL;
 	return -EBADR;
 
  found:
-	if (r->res_nodeid && (flags & R_MASTER))
-		error = -ENOTBLK;
-	*r_ret = r;
-	return error;
+	*r_ret = rsb;
+	if (rsb->res_nodeid && (flags & R_MASTER))
+		return -ENOTBLK;
+
+	return 0;
+}
+
+static void rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+{
+	struct rb_node **newn = &tree->rb_node, *parent = NULL;
+	int rc;
+	const char *name = rsb->res_name; /* This is just for clarity */
+
+	/* Figure out where to put new node */
+	while (*newn) {
+		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
+					       res_hashnode);
+
+		parent = *newn;
+		rc = memcmp(name, cur->res_name, cur->res_length);
+		if (rc < 0)
+			newn = &((*newn)->rb_left);
+		else if (rc > 0)
+			newn = &((*newn)->rb_right);
+		else
+			return;
+	}
+
+	rb_link_node(&rsb->res_hashnode, parent, newn);
+	rb_insert_color(&rsb->res_hashnode, tree);
 }
 
 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 		       unsigned int flags, struct dlm_rsb **r_ret)
 {
-	struct dlm_rsb *r;
+	struct dlm_rsb *r = NULL;
 	int error;
 
-	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
 	if (!error) {
 		kref_get(&r->res_ref);
 		goto out;
 	}
-	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 	if (error)
 		goto out;
 
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 
 	if (dlm_no_directory(ls))
 		goto out;
@@ -487,13 +478,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 	hash = jhash(name, namelen, 0);
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
- retry:
-	if (flags & R_CREATE) {
-		error = pre_rsb_struct(ls);
-		if (error < 0)
-			goto out;
-	}
-
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 
 	error = _search_rsb(ls, name, namelen, bucket, flags, &r);
@@ -508,10 +492,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 		goto out_unlock;
 
 	error = get_rsb_struct(ls, name, namelen, &r);
-	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-		goto retry;
-	}
 	if (error)
 		goto out_unlock;
 
@@ -527,7 +507,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 			nodeid = 0;
 		r->res_nodeid = nodeid;
 	}
-	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
+	rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
 	error = 0;
  out_unlock:
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -556,7 +536,8 @@ static void toss_rsb(struct kref *kref)
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
+	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
 	r->res_toss_time = jiffies;
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
@@ -1087,19 +1068,26 @@ static void dir_remove(struct dlm_rsb *r)
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int count = 0, found;
 
 	for (;;) {
 		found = 0;
 		spin_lock(&ls->ls_rsbtbl[b].lock);
-		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
-					    res_hashchain) {
-			if (!time_after_eq(jiffies, r->res_toss_time +
-					   dlm_config.ci_toss_secs * HZ))
-				continue;
-			found = 1;
-			break;
+		n = rb_first(&ls->ls_rsbtbl[b].toss);
+		while (n) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (unlikely(&r->res_hashnode == n)) {
+				spin_unlock(&ls->ls_rsbtbl[b].lock);
+				return 0;
+			}
+			if (time_after_eq(jiffies, r->res_toss_time +
+					  dlm_config.ci_toss_secs * HZ)) {
+				found = 1;
+				break;
+			}
+			n = rb_next(&r->res_hashnode);
 		}
 
 		if (!found) {
@@ -1108,7 +1096,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 		}
 
 		if (kref_put(&r->res_ref, kill_rsb)) {
-			list_del(&r->res_hashchain);
+			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 			spin_unlock(&ls->ls_rsbtbl[b].lock);
 
 			if (is_master(r))
@@ -4441,16 +4429,20 @@ int dlm_purge_locks(struct dlm_ls *ls)
 
 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r, *r_ret = NULL;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
-		if (!rsb_flag(r, RSB_LOCKS_PURGED))
-			continue;
-		hold_rsb(r);
-		rsb_clear_flag(r, RSB_LOCKS_PURGED);
-		r_ret = r;
-		break;
+	n = rb_first(&ls->ls_rsbtbl[bucket].keep);
+	while (n) {
+		r = rb_entry(n, struct dlm_rsb, res_hashnode);
+		if (rsb_flag(r, RSB_LOCKS_PURGED)) {
+			hold_rsb(r);
+			rsb_clear_flag(r, RSB_LOCKS_PURGED);
+			r_ret = r;
+			break;
+		}
+		n = rb_next(&r->res_hashnode);
 	}
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 	return r_ret;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1d8f1a..0bee1c7 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -457,8 +457,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
+		ls->ls_rsbtbl[i].keep.rb_node = NULL;
+		ls->ls_rsbtbl[i].toss.rb_node = NULL;
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
@@ -483,9 +483,6 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 	INIT_LIST_HEAD(&ls->ls_timeout);
 	mutex_init(&ls->ls_timeout_mutex);
 
-	INIT_LIST_HEAD(&ls->ls_new_rsb);
-	spin_lock_init(&ls->ls_new_rsb_spin);
-
 	INIT_LIST_HEAD(&ls->ls_nodes);
 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
 	ls->ls_num_nodes = 0;
@@ -685,7 +682,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct list_head *head;
+	struct rb_node *n = NULL;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -746,33 +743,21 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		head = &ls->ls_rsbtbl[i].list;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].keep);
 			dlm_free_rsb(rsb);
 		}
 
-		head = &ls->ls_rsbtbl[i].toss;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
 			dlm_free_rsb(rsb);
 		}
 	}
 
 	vfree(ls->ls_rsbtbl);
 
-	while (!list_empty(&ls->ls_new_rsb)) {
-		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
-				       res_hashchain);
-		list_del(&rsb->res_hashchain);
-		dlm_free_rsb(rsb);
-	}
-
 	/*
 	 * Free structures on any other lists
 	 */
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 1463823..65dff1b 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -715,6 +715,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 int dlm_create_root_list(struct dlm_ls *ls)
 {
+	struct rb_node *n = NULL;
 	struct dlm_rsb *r;
 	int i, error = 0;
 
@@ -727,9 +728,12 @@ int dlm_create_root_list(struct dlm_ls *ls)
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+		n = rb_first(&ls->ls_rsbtbl[i].keep);
+		while (n) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
+			n = rb_next(&r->res_hashnode);
 		}
 
 		/* If we're using a directory, add tossed rsbs to the root
@@ -741,9 +745,12 @@ int dlm_create_root_list(struct dlm_ls *ls)
 			continue;
 		}
 
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+		n = rb_first(&ls->ls_rsbtbl[i].toss);
+		while (n) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
+			n = rb_next(&r->res_hashnode);
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}
@@ -771,17 +778,21 @@ void dlm_release_root_list(struct dlm_ls *ls)
 
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
-	struct dlm_rsb *r, *safe;
+	struct rb_node *node, *next;
+	struct dlm_rsb *rsb;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
-					 res_hashchain) {
-			if (dlm_no_directory(ls) || !is_master(r)) {
-				list_del(&r->res_hashchain);
-				dlm_free_rsb(r);
+		node = rb_first(&ls->ls_rsbtbl[i].toss);
+		while (node) {
+			rsb = rb_entry(node, struct dlm_rsb, res_hashnode);
+			next = rb_next(&rsb->res_hashnode);
+			if (dlm_no_directory(ls) || !is_master(rsb)) {
+				rb_erase(node, &ls->ls_rsbtbl[i].toss);
+				dlm_free_rsb(rsb);
 			}
+			node = next;
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}




More information about the Cluster-devel mailing list