[Cluster-devel] [PATCH 1/5] dlm: convert rsb list to rb_tree
Steven Whitehouse
swhiteho at redhat.com
Mon Dec 19 12:03:02 UTC 2011
Hi,
Looks good to me,
Steve.
On Fri, 2011-12-16 at 16:03 -0600, David Teigland wrote:
> From: Bob Peterson <rpeterso at redhat.com>
>
> Change the linked lists to rb_tree's in the rsb
> hash table to speed up searches. Slow rsb searches
> were having a large impact on gfs2 performance due
> to the large number of dlm locks gfs2 uses.
>
> Signed-off-by: Bob Peterson <rpeterso at redhat.com>
> Signed-off-by: David Teigland <teigland at redhat.com>
> ---
> fs/dlm/debug_fs.c | 28 ++++++++-------
> fs/dlm/dlm_internal.h | 9 +++--
> fs/dlm/lock.c | 87 +++++++++++++++++++++++++++++++++++++++---------
> fs/dlm/lockspace.c | 23 +++++--------
> fs/dlm/recover.c | 21 +++++++----
> 5 files changed, 113 insertions(+), 55 deletions(-)
>
> diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
> index 5977923..3dca2b3 100644
> --- a/fs/dlm/debug_fs.c
> +++ b/fs/dlm/debug_fs.c
> @@ -393,6 +393,7 @@ static const struct seq_operations format3_seq_ops;
>
> static void *table_seq_start(struct seq_file *seq, loff_t *pos)
> {
> + struct rb_node *node;
> struct dlm_ls *ls = seq->private;
> struct rsbtbl_iter *ri;
> struct dlm_rsb *r;
> @@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
> ri->format = 3;
>
> spin_lock(&ls->ls_rsbtbl[bucket].lock);
> - if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
> - list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
> - res_hashchain) {
> + if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
> + for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
> + node = rb_next(node)) {
> + r = rb_entry(node, struct dlm_rsb, res_hashnode);
> if (!entry--) {
> dlm_hold_rsb(r);
> ri->rsb = r;
> @@ -449,9 +451,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
> }
>
> spin_lock(&ls->ls_rsbtbl[bucket].lock);
> - if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
> - r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
> - struct dlm_rsb, res_hashchain);
> + if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
> + node = rb_first(&ls->ls_rsbtbl[bucket].keep);
> + r = rb_entry(node, struct dlm_rsb, res_hashnode);
> dlm_hold_rsb(r);
> ri->rsb = r;
> ri->bucket = bucket;
> @@ -467,7 +469,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
> {
> struct dlm_ls *ls = seq->private;
> struct rsbtbl_iter *ri = iter_ptr;
> - struct list_head *next;
> + struct rb_node *next;
> struct dlm_rsb *r, *rp;
> loff_t n = *pos;
> unsigned bucket;
> @@ -480,10 +482,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
>
> spin_lock(&ls->ls_rsbtbl[bucket].lock);
> rp = ri->rsb;
> - next = rp->res_hashchain.next;
> + next = rb_next(&rp->res_hashnode);
>
> - if (next != &ls->ls_rsbtbl[bucket].list) {
> - r = list_entry(next, struct dlm_rsb, res_hashchain);
> + if (next) {
> + r = rb_entry(next, struct dlm_rsb, res_hashnode);
> dlm_hold_rsb(r);
> ri->rsb = r;
> spin_unlock(&ls->ls_rsbtbl[bucket].lock);
> @@ -511,9 +513,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
> }
>
> spin_lock(&ls->ls_rsbtbl[bucket].lock);
> - if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
> - r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
> - struct dlm_rsb, res_hashchain);
> + if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
> + next = rb_first(&ls->ls_rsbtbl[bucket].keep);
> + r = rb_entry(next, struct dlm_rsb, res_hashnode);
> dlm_hold_rsb(r);
> ri->rsb = r;
> ri->bucket = bucket;
> diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
> index fe2860c..5685a9a 100644
> --- a/fs/dlm/dlm_internal.h
> +++ b/fs/dlm/dlm_internal.h
> @@ -103,8 +103,8 @@ struct dlm_dirtable {
> };
>
> struct dlm_rsbtable {
> - struct list_head list;
> - struct list_head toss;
> + struct rb_root keep;
> + struct rb_root toss;
> spinlock_t lock;
> };
>
> @@ -285,7 +285,10 @@ struct dlm_rsb {
> unsigned long res_toss_time;
> uint32_t res_first_lkid;
> struct list_head res_lookup; /* lkbs waiting on first */
> - struct list_head res_hashchain; /* rsbtbl */
> + union {
> + struct list_head res_hashchain;
> + struct rb_node res_hashnode; /* rsbtbl */
> + };
> struct list_head res_grantqueue;
> struct list_head res_convertqueue;
> struct list_head res_waitqueue;
> diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
> index 83b5e32..d471830 100644
> --- a/fs/dlm/lock.c
> +++ b/fs/dlm/lock.c
> @@ -56,6 +56,7 @@
> L: receive_xxxx_reply() <- R: send_xxxx_reply()
> */
> #include <linux/types.h>
> +#include <linux/rbtree.h>
> #include <linux/slab.h>
> #include "dlm_internal.h"
> #include <linux/dlm_device.h>
> @@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
>
> r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
> list_del(&r->res_hashchain);
> + /* Convert the empty list_head to a NULL rb_node for tree usage: */
> + memset(&r->res_hashnode, 0, sizeof(struct rb_node));
> ls->ls_new_rsb_count--;
> spin_unlock(&ls->ls_new_rsb_spin);
>
> @@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
> memcpy(r->res_name, name, len);
> mutex_init(&r->res_mutex);
>
> - INIT_LIST_HEAD(&r->res_hashchain);
> INIT_LIST_HEAD(&r->res_lookup);
> INIT_LIST_HEAD(&r->res_grantqueue);
> INIT_LIST_HEAD(&r->res_convertqueue);
> @@ -400,14 +402,31 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
> return 0;
> }
>
> -static int search_rsb_list(struct list_head *head, char *name, int len,
> +static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
> +{
> + char maxname[DLM_RESNAME_MAXLEN];
> +
> + memset(maxname, 0, DLM_RESNAME_MAXLEN);
> + memcpy(maxname, name, nlen);
> + return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
> +}
> +
> +static int search_rsb_tree(struct rb_root *tree, char *name, int len,
> unsigned int flags, struct dlm_rsb **r_ret)
> {
> + struct rb_node *node = tree->rb_node;
> struct dlm_rsb *r;
> int error = 0;
> -
> - list_for_each_entry(r, head, res_hashchain) {
> - if (len == r->res_length && !memcmp(name, r->res_name, len))
> + int rc;
> +
> + while (node) {
> + r = rb_entry(node, struct dlm_rsb, res_hashnode);
> + rc = rsb_cmp(r, name, len);
> + if (rc < 0)
> + node = node->rb_left;
> + else if (rc > 0)
> + node = node->rb_right;
> + else
> goto found;
> }
> *r_ret = NULL;
> @@ -420,22 +439,54 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
> return error;
> }
>
> +static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
> +{
> + struct rb_node **newn = &tree->rb_node;
> + struct rb_node *parent = NULL;
> + int rc;
> +
> + while (*newn) {
> + struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
> + res_hashnode);
> +
> + parent = *newn;
> + rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
> + if (rc < 0)
> + newn = &parent->rb_left;
> + else if (rc > 0)
> + newn = &parent->rb_right;
> + else {
> + log_print("rsb_insert match");
> + dlm_dump_rsb(rsb);
> + dlm_dump_rsb(cur);
> + return -EEXIST;
> + }
> + }
> +
> + rb_link_node(&rsb->res_hashnode, parent, newn);
> + rb_insert_color(&rsb->res_hashnode, tree);
> + return 0;
> +}
> +
> static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
> unsigned int flags, struct dlm_rsb **r_ret)
> {
> struct dlm_rsb *r;
> int error;
>
> - error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
> + error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
> if (!error) {
> kref_get(&r->res_ref);
> goto out;
> }
> - error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
> + error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
> if (error)
> goto out;
>
> - list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
> + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
> + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
> + if (error)
> + return error;
>
> if (dlm_no_directory(ls))
> goto out;
> @@ -527,8 +578,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
> nodeid = 0;
> r->res_nodeid = nodeid;
> }
> - list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
> - error = 0;
> + error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
> out_unlock:
> spin_unlock(&ls->ls_rsbtbl[bucket].lock);
> out:
> @@ -556,7 +606,8 @@ static void toss_rsb(struct kref *kref)
>
> DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
> kref_init(&r->res_ref);
> - list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
> + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
> + rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
> r->res_toss_time = jiffies;
> if (r->res_lvbptr) {
> dlm_free_lvb(r->res_lvbptr);
> @@ -1082,19 +1133,19 @@ static void dir_remove(struct dlm_rsb *r)
> r->res_name, r->res_length);
> }
>
> -/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
> - found since they are in order of newest to oldest? */
> +/* FIXME: make this more efficient */
>
> static int shrink_bucket(struct dlm_ls *ls, int b)
> {
> + struct rb_node *n;
> struct dlm_rsb *r;
> int count = 0, found;
>
> for (;;) {
> found = 0;
> spin_lock(&ls->ls_rsbtbl[b].lock);
> - list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
> - res_hashchain) {
> + for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
> + r = rb_entry(n, struct dlm_rsb, res_hashnode);
> if (!time_after_eq(jiffies, r->res_toss_time +
> dlm_config.ci_toss_secs * HZ))
> continue;
> @@ -1108,7 +1159,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
> }
>
> if (kref_put(&r->res_ref, kill_rsb)) {
> - list_del(&r->res_hashchain);
> + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
> spin_unlock(&ls->ls_rsbtbl[b].lock);
>
> if (is_master(r))
> @@ -4441,10 +4492,12 @@ int dlm_purge_locks(struct dlm_ls *ls)
>
> static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
> {
> + struct rb_node *n;
> struct dlm_rsb *r, *r_ret = NULL;
>
> spin_lock(&ls->ls_rsbtbl[bucket].lock);
> - list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
> + for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
> + r = rb_entry(n, struct dlm_rsb, res_hashnode);
> if (!rsb_flag(r, RSB_LOCKS_PURGED))
> continue;
> hold_rsb(r);
> diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
> index a1d8f1a..1d16a23 100644
> --- a/fs/dlm/lockspace.c
> +++ b/fs/dlm/lockspace.c
> @@ -457,8 +457,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
> if (!ls->ls_rsbtbl)
> goto out_lsfree;
> for (i = 0; i < size; i++) {
> - INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
> - INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
> + ls->ls_rsbtbl[i].keep.rb_node = NULL;
> + ls->ls_rsbtbl[i].toss.rb_node = NULL;
> spin_lock_init(&ls->ls_rsbtbl[i].lock);
> }
>
> @@ -685,7 +685,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
> static int release_lockspace(struct dlm_ls *ls, int force)
> {
> struct dlm_rsb *rsb;
> - struct list_head *head;
> + struct rb_node *n;
> int i, busy, rv;
>
> busy = lockspace_busy(ls, force);
> @@ -746,20 +746,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
> */
>
> for (i = 0; i < ls->ls_rsbtbl_size; i++) {
> - head = &ls->ls_rsbtbl[i].list;
> - while (!list_empty(head)) {
> - rsb = list_entry(head->next, struct dlm_rsb,
> - res_hashchain);
> -
> - list_del(&rsb->res_hashchain);
> + while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
> + rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
> + rb_erase(n, &ls->ls_rsbtbl[i].keep);
> dlm_free_rsb(rsb);
> }
>
> - head = &ls->ls_rsbtbl[i].toss;
> - while (!list_empty(head)) {
> - rsb = list_entry(head->next, struct dlm_rsb,
> - res_hashchain);
> - list_del(&rsb->res_hashchain);
> + while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
> + rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
> + rb_erase(n, &ls->ls_rsbtbl[i].toss);
> dlm_free_rsb(rsb);
> }
> }
> diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
> index 1463823..50467ce 100644
> --- a/fs/dlm/recover.c
> +++ b/fs/dlm/recover.c
> @@ -715,6 +715,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
>
> int dlm_create_root_list(struct dlm_ls *ls)
> {
> + struct rb_node *n;
> struct dlm_rsb *r;
> int i, error = 0;
>
> @@ -727,7 +728,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
>
> for (i = 0; i < ls->ls_rsbtbl_size; i++) {
> spin_lock(&ls->ls_rsbtbl[i].lock);
> - list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
> + for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
> + r = rb_entry(n, struct dlm_rsb, res_hashnode);
> list_add(&r->res_root_list, &ls->ls_root_list);
> dlm_hold_rsb(r);
> }
> @@ -741,7 +743,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
> continue;
> }
>
> - list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
> + for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
> + r = rb_entry(n, struct dlm_rsb, res_hashnode);
> list_add(&r->res_root_list, &ls->ls_root_list);
> dlm_hold_rsb(r);
> }
> @@ -771,16 +774,18 @@ void dlm_release_root_list(struct dlm_ls *ls)
>
> void dlm_clear_toss_list(struct dlm_ls *ls)
> {
> - struct dlm_rsb *r, *safe;
> + struct rb_node *n, *next;
> + struct dlm_rsb *rsb;
> int i;
>
> for (i = 0; i < ls->ls_rsbtbl_size; i++) {
> spin_lock(&ls->ls_rsbtbl[i].lock);
> - list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
> - res_hashchain) {
> - if (dlm_no_directory(ls) || !is_master(r)) {
> - list_del(&r->res_hashchain);
> - dlm_free_rsb(r);
> + for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
> + next = rb_next(n);;
> + rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
> + if (dlm_no_directory(ls) || !is_master(rsb)) {
> + rb_erase(n, &ls->ls_rsbtbl[i].toss);
> + dlm_free_rsb(rsb);
> }
> }
> spin_unlock(&ls->ls_rsbtbl[i].lock);
More information about the Cluster-devel
mailing list