[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Linux-cluster] [PATCH 4/8] dlm: recovery



When a node is removed from a lockspace, recovery is required for that
lockspace on all the remaining lockspace members.  Recovery involves: a
full rebuild of the distributed resource directory, selecting a new master
node for locks/resources previously mastered on the removed node, and
rebuilding master-copy locks on newly selected masters.

Signed-off-by: Dave Teigland <teigland redhat com>
Signed-off-by: Patrick Caulfield <pcaulfie redhat com>

---

 drivers/dlm/member.c       |  356 +++++++++++++++++++++
 drivers/dlm/member.h       |   29 +
 drivers/dlm/rcom.c         |  466 ++++++++++++++++++++++++++++
 drivers/dlm/rcom.h         |   24 +
 drivers/dlm/recover.c      |  729 +++++++++++++++++++++++++++++++++++++++++++++
 drivers/dlm/recover.h      |   32 +
 drivers/dlm/recoverd.c     |  714 ++++++++++++++++++++++++++++++++++++++++++++
 drivers/dlm/recoverd.h     |   24 +
 drivers/dlm/requestqueue.c |  144 ++++++++
 drivers/dlm/requestqueue.h |   22 +
 10 files changed, 2540 insertions(+)

--- a/drivers/dlm/rcom.c	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/rcom.c	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,466 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
+**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_internal.h"
+#include "lockspace.h"
+#include "member.h"
+#include "lowcomms.h"
+#include "midcomms.h"
+#include "rcom.h"
+#include "recover.h"
+#include "dir.h"
+#include "config.h"
+#include "memory.h"
+#include "lock.h"
+#include "util.h"
+
+
+static int rcom_response(struct dlm_ls *ls)
+{
+	return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
+}
+
+static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
+		       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	char *mb;
+	int mb_len = sizeof(struct dlm_rcom) + len;
+
+	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
+	if (!mh) {
+		log_print("create_rcom to %d type %d len %d ENOBUFS",
+			  to_nodeid, type, len);
+		return -ENOBUFS;
+	}
+	memset(mb, 0, mb_len);
+
+	rc = (struct dlm_rcom *) mb;
+
+	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
+	rc->rc_header.h_lockspace = ls->ls_global_id;
+	rc->rc_header.h_nodeid = dlm_our_nodeid();
+	rc->rc_header.h_length = mb_len;
+	rc->rc_header.h_cmd = DLM_RCOM;
+
+	rc->rc_type = type;
+
+	*mh_ret = mh;
+	*rc_ret = rc;
+	return 0;
+}
+
+static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
+		      struct dlm_rcom *rc)
+{
+	dlm_rcom_out(rc);
+	dlm_lowcomms_commit_buffer(mh);
+}
+
+/* When replying to a status request, a node also sends back its
+   configuration values.  The requesting node then checks that the remote
+   node is configured the same way as itself. */
+
+static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
+{
+	rf->rf_lvblen = ls->ls_lvblen;
+}
+
+static int check_config(struct dlm_ls *ls, struct rcom_config *rf)
+{
+	if (rf->rf_lvblen != ls->ls_lvblen)
+		return -EINVAL;
+	return 0;
+}
+
+static int make_status(struct dlm_ls *ls)
+{
+	int status = 0;
+
+	if (test_bit(LSFL_NODES_VALID, &ls->ls_flags))
+		status |= NODES_VALID;
+
+	if (test_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags))
+		status |= NODES_ALL_VALID;
+
+	if (test_bit(LSFL_DIR_VALID, &ls->ls_flags))
+		status |= DIR_VALID;
+
+	if (test_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags))
+		status |= DIR_ALL_VALID;
+
+	if (test_bit(LSFL_LOCKS_VALID, &ls->ls_flags))
+		status |= LOCKS_VALID;
+
+	if (test_bit(LSFL_ALL_LOCKS_VALID, &ls->ls_flags))
+		status |= LOCKS_ALL_VALID;
+
+	return status;
+}
+
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	int error = 0;
+
+	memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
+
+	if (nodeid == dlm_our_nodeid()) {
+		rc = (struct dlm_rcom *) ls->ls_recover_buf;
+		rc->rc_result = make_status(ls);
+		goto out;
+	}
+
+	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
+	if (error)
+		goto out;
+
+	send_rcom(ls, mh, rc);
+
+	error = dlm_wait_function(ls, &rcom_response);
+	clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
+	if (error)
+		goto out;
+
+	rc = (struct dlm_rcom *) ls->ls_recover_buf;
+	error = check_config(ls, (struct rcom_config *) rc->rc_buf);
+ out:
+	return error;
+}
+
+static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	int error, nodeid = rc_in->rc_header.h_nodeid;
+
+	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
+			    sizeof(struct rcom_config), &rc, &mh);
+	if (error)
+		return;
+	rc->rc_result = make_status(ls);
+	make_config(ls, (struct rcom_config *) rc->rc_buf);
+
+	send_rcom(ls, mh, rc);
+}
+
+static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
+	set_bit(LSFL_RCOM_READY, &ls->ls_flags);
+	wake_up(&ls->ls_wait_general);
+}
+
+int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	int error = 0, len = sizeof(struct dlm_rcom);
+
+	memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
+
+	if (nodeid == dlm_our_nodeid()) {
+		dlm_copy_master_names(ls, last_name, last_len,
+		                      ls->ls_recover_buf + len,
+		                      dlm_config.buffer_size - len, nodeid);
+		goto out;
+	}
+
+	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
+	if (error)
+		goto out;
+	memcpy(rc->rc_buf, last_name, last_len);
+
+	send_rcom(ls, mh, rc);
+
+	error = dlm_wait_function(ls, &rcom_response);
+	clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
+ out:
+	return error;
+}
+
+static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	int error, inlen, outlen;
+	int nodeid = rc_in->rc_header.h_nodeid;
+
+	/*
+	 * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
+	 * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
+	 * It could only happen in rare cases where we get a late NAMES
+	 * message from a previous instance of recovery.
+	 */
+
+	if (!test_bit(LSFL_NODES_VALID, &ls->ls_flags)) {
+		log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
+		return;
+	}
+		
+	nodeid = rc_in->rc_header.h_nodeid;
+	inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
+	outlen = dlm_config.buffer_size - sizeof(struct dlm_rcom);
+
+	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
+	if (error)
+		return;
+
+	dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
+			      nodeid);
+	send_rcom(ls, mh, rc);
+}
+
+static void receive_rcom_names_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
+	set_bit(LSFL_RCOM_READY, &ls->ls_flags);
+	wake_up(&ls->ls_wait_general);
+}
+
+int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	struct dlm_ls *ls = r->res_ls;
+	int error;
+
+	error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
+			    &rc, &mh);
+	if (error)
+		goto out;
+	memcpy(rc->rc_buf, r->res_name, r->res_length);
+	rc->rc_id = (unsigned long) r;
+
+	send_rcom(ls, mh, rc);
+ out:
+	return error;
+}
+
+static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
+	int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
+
+	error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
+	if (error)
+		return;
+
+	error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
+	if (error)
+		ret_nodeid = error;
+	rc->rc_result = ret_nodeid;
+	rc->rc_id = rc_in->rc_id;
+
+	send_rcom(ls, mh, rc);
+}
+
+static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	dlm_recover_master_reply(ls, rc_in);
+}
+
+static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
+			   struct rcom_lock *rl)
+{
+	memset(rl, 0, sizeof(*rl));
+
+	rl->rl_ownpid = lkb->lkb_ownpid;
+	rl->rl_lkid = lkb->lkb_id;
+	rl->rl_exflags = lkb->lkb_exflags;
+	rl->rl_flags = lkb->lkb_flags;
+	rl->rl_lvbseq = lkb->lkb_lvbseq;
+	rl->rl_rqmode = lkb->lkb_rqmode;
+	rl->rl_grmode = lkb->lkb_grmode;
+	rl->rl_status = lkb->lkb_status;
+	rl->rl_wait_type = lkb->lkb_wait_type;
+
+	if (lkb->lkb_bastaddr)
+		rl->rl_asts |= AST_BAST;
+	if (lkb->lkb_astaddr)
+		rl->rl_asts |= AST_COMP;
+
+	if (lkb->lkb_range)
+		memcpy(rl->rl_range, lkb->lkb_range, 4*sizeof(uint64_t));
+
+	rl->rl_namelen = r->res_length;
+	memcpy(rl->rl_name, r->res_name, r->res_length);
+
+	/* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
+	   If so, receive_rcom_lock_args() won't take this copy. */
+
+	if (lkb->lkb_lvbptr)
+		memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
+}
+
+int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	struct dlm_ls *ls = r->res_ls;
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	struct rcom_lock *rl;
+	int error, len = sizeof(struct rcom_lock);
+
+	if (lkb->lkb_lvbptr)
+		len += ls->ls_lvblen;
+
+	error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
+	if (error)
+		goto out;
+
+	rl = (struct rcom_lock *) rc->rc_buf;
+	pack_rcom_lock(r, lkb, rl);
+	rc->rc_id = (unsigned long) r;
+
+	send_rcom(ls, mh, rc);
+ out:
+	return error;
+}
+
+static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	int error, nodeid = rc_in->rc_header.h_nodeid;
+
+	dlm_recover_master_copy(ls, rc_in);
+
+	error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
+			    sizeof(struct rcom_lock), &rc, &mh);
+	if (error)
+		return;
+
+	/* We send back the same rcom_lock struct we received, but
+	   dlm_recover_master_copy() has filled in rl_remid and rl_result */
+
+	memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
+	rc->rc_id = rc_in->rc_id;
+
+	send_rcom(ls, mh, rc);
+}
+
+static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+{
+	if (!test_bit(LSFL_DIR_VALID, &ls->ls_flags)) {
+		log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
+			  rc_in->rc_header.h_nodeid);
+		return;
+	}
+
+	dlm_recover_process_copy(ls, rc_in);
+}
+
+static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	char *mb;
+	int mb_len = sizeof(struct dlm_rcom);
+
+	mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_KERNEL, &mb);
+	if (!mh)
+		return -ENOBUFS;
+	memset(mb, 0, mb_len);
+
+	rc = (struct dlm_rcom *) mb;
+
+	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
+	rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
+	rc->rc_header.h_nodeid = dlm_our_nodeid();
+	rc->rc_header.h_length = mb_len;
+	rc->rc_header.h_cmd = DLM_RCOM;
+
+	rc->rc_type = DLM_RCOM_STATUS_REPLY;
+	rc->rc_result = 0;
+
+	dlm_rcom_out(rc);
+	dlm_lowcomms_commit_buffer(mh);
+
+	return 0;
+}
+
+/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
+   recovery-only comms are sent through here. */
+
+void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
+{
+	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+	struct dlm_ls *ls;
+
+	dlm_rcom_in(rc);
+
+	/* If the lockspace doesn't exist then still send a status message
+	   back; it's possible that it just doesn't have its global_id yet. */
+
+	ls = dlm_find_lockspace_global(hd->h_lockspace);
+	if (!ls) {
+		send_ls_not_ready(nodeid, rc);
+		return;
+	}
+
+	if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
+		log_error(ls, "ignoring recovery message %x from %d",
+			  rc->rc_type, nodeid);
+		goto out;
+	}
+
+	if (nodeid != rc->rc_header.h_nodeid) {
+		log_error(ls, "bad rcom nodeid %d from %d",
+			  rc->rc_header.h_nodeid, nodeid);
+		goto out;
+	}
+
+	switch (rc->rc_type) {
+	case DLM_RCOM_STATUS:
+		receive_rcom_status(ls, rc);
+		break;
+
+	case DLM_RCOM_NAMES:
+		receive_rcom_names(ls, rc);
+		break;
+
+	case DLM_RCOM_LOOKUP:
+		receive_rcom_lookup(ls, rc);
+		break;
+
+	case DLM_RCOM_LOCK:
+		receive_rcom_lock(ls, rc);
+		break;
+
+	case DLM_RCOM_STATUS_REPLY:
+		receive_rcom_status_reply(ls, rc);
+		break;
+
+	case DLM_RCOM_NAMES_REPLY:
+		receive_rcom_names_reply(ls, rc);
+		break;
+
+	case DLM_RCOM_LOOKUP_REPLY:
+		receive_rcom_lookup_reply(ls, rc);
+		break;
+
+	case DLM_RCOM_LOCK_REPLY:
+		receive_rcom_lock_reply(ls, rc);
+		break;
+
+	default:
+		DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
+	}
+ out:
+	dlm_put_lockspace(ls);
+}
+
--- a/drivers/dlm/rcom.h	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/rcom.h	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,24 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
+**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#ifndef __RCOM_DOT_H__
+#define __RCOM_DOT_H__
+
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
+int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
+int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
+int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+void dlm_receive_rcom(struct dlm_header *hd, int nodeid);
+
+#endif
+
--- a/drivers/dlm/recover.c	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/recover.c	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,729 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
+**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_internal.h"
+#include "lockspace.h"
+#include "dir.h"
+#include "config.h"
+#include "ast.h"
+#include "memory.h"
+#include "rcom.h"
+#include "lock.h"
+#include "lowcomms.h"
+#include "member.h"
+
+static struct timer_list dlm_timer;
+
+
+/*
+ * Recovery waiting routines: these functions wait for a particular reply from
+ * a remote node, or for the remote node to report a certain status.  They need
+ * to abort if the lockspace is stopped indicating a node has failed (perhaps
+ * the one being waited for).
+ */
+
+int dlm_recovery_stopped(struct dlm_ls *ls)
+{
+	return test_bit(LSFL_LS_STOP, &ls->ls_flags);
+}
+
+/*
+ * Wait until given function returns non-zero or lockspace is stopped (LS_STOP
+ * set due to failure of a node in ls_nodes).  When another function thinks it
+ * could have completed the waited-on task, they should wake up ls_wait_general
+ * to get an immediate response rather than waiting for the timer to detect the
+ * result.  A timer wakes us up periodically while waiting to see if we should
+ * abort due to a node failure.  This should only be called by the dlm_recoverd
+ * thread.
+ */
+
+static void dlm_wait_timer_fn(unsigned long data)
+{
+	struct dlm_ls *ls = (struct dlm_ls *) data;
+	mod_timer(&dlm_timer, jiffies + (dlm_config.recover_timer * HZ));
+	wake_up(&ls->ls_wait_general);
+}
+
+int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
+{
+	int error = 0;
+
+	init_timer(&dlm_timer);
+	dlm_timer.function = dlm_wait_timer_fn;
+	dlm_timer.data = (long) ls;
+	dlm_timer.expires = jiffies + (dlm_config.recover_timer * HZ);
+	add_timer(&dlm_timer);
+
+	wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
+	del_timer_sync(&dlm_timer);
+
+	if (dlm_recovery_stopped(ls))
+		error = -EINTR;
+	return error;
+}
+
+/*
+ * An efficient way for all nodes to wait for all others to have a certain
+ * status.  The node with the lowest nodeid polls all the others for their
+ * status (dlm_wait_status_all) and all the others poll the node with the low
+ * id for its accumulated result (dlm_wait_status_low).
+ */
+
+static int dlm_wait_status_all(struct dlm_ls *ls, unsigned int wait_status)
+{
+	struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
+	struct dlm_member *memb;
+	int error = 0, delay;
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		delay = 0;
+		for (;;) {
+			error = dlm_recovery_stopped(ls);
+			if (error)
+				goto out;
+
+			error = dlm_rcom_status(ls, memb->nodeid);
+			if (error)
+				goto out;
+
+			if (rc->rc_result & wait_status)
+				break;
+			if (delay < 1000)
+				delay += 20;
+			msleep(delay);
+		}
+	}
+ out:
+	return error;
+}
+
+static int dlm_wait_status_low(struct dlm_ls *ls, unsigned int wait_status)
+{
+	struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
+	int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
+
+	for (;;) {
+		error = dlm_recovery_stopped(ls);
+		if (error)
+			goto out;
+
+		error = dlm_rcom_status(ls, nodeid);
+		if (error)
+			break;
+
+		if (rc->rc_result & wait_status)
+			break;
+		if (delay < 1000)
+			delay += 20;
+		msleep(delay);
+	}
+ out:
+	return error;
+}
+
+int dlm_recover_members_wait(struct dlm_ls *ls)
+{
+	int error;
+
+	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
+		error = dlm_wait_status_all(ls, NODES_VALID);
+		if (!error)
+			set_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags);
+	} else
+		error = dlm_wait_status_low(ls, NODES_ALL_VALID);
+
+	return error;
+}
+
+int dlm_recover_directory_wait(struct dlm_ls *ls)
+{
+	int error;
+
+	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
+		error = dlm_wait_status_all(ls, DIR_VALID);
+		if (!error)
+			set_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags);
+	} else
+		error = dlm_wait_status_low(ls, DIR_ALL_VALID);
+
+	return error;
+}
+
+int dlm_recover_locks_wait(struct dlm_ls *ls)
+{
+	int error;
+
+	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
+		error = dlm_wait_status_all(ls, LOCKS_VALID);
+		if (!error)
+			set_bit(LSFL_ALL_LOCKS_VALID, &ls->ls_flags);
+	} else
+		error = dlm_wait_status_low(ls, LOCKS_ALL_VALID);
+
+	return error;
+}
+
+/*
+ * The recover_list contains all the rsb's for which we've requested the new
+ * master nodeid.  As replies are returned from the resource directories the
+ * rsb's are removed from the list.  When the list is empty we're done.
+ *
+ * The recover_list is later similarly used for all rsb's for which we've sent
+ * new lkb's and need to receive new corresponding lkid's.
+ *
+ * We use the address of the rsb struct as a simple local identifier for the
+ * rsb so we can match an rcom reply with the rsb it was sent for.
+ */
+
+static int recover_list_empty(struct dlm_ls *ls)
+{
+	int empty;
+
+	spin_lock(&ls->ls_recover_list_lock);
+	empty = list_empty(&ls->ls_recover_list);
+	spin_unlock(&ls->ls_recover_list_lock);
+
+	return empty;
+}
+
+static void recover_list_add(struct dlm_rsb *r)
+{
+	struct dlm_ls *ls = r->res_ls;
+
+	spin_lock(&ls->ls_recover_list_lock);
+	if (list_empty(&r->res_recover_list)) {
+		list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
+		ls->ls_recover_list_count++;
+		dlm_hold_rsb(r);
+	}
+	spin_unlock(&ls->ls_recover_list_lock);
+}
+
+static void recover_list_del(struct dlm_rsb *r)
+{
+	struct dlm_ls *ls = r->res_ls;
+
+	spin_lock(&ls->ls_recover_list_lock);
+	list_del_init(&r->res_recover_list);
+	ls->ls_recover_list_count--;
+	spin_unlock(&ls->ls_recover_list_lock);
+
+	dlm_put_rsb(r);
+}
+
+static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
+{
+	struct dlm_rsb *r = NULL;
+
+	spin_lock(&ls->ls_recover_list_lock);
+
+	list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
+		if (id == (unsigned long) r)
+			goto out;
+	}
+	r = NULL;
+ out:
+	spin_unlock(&ls->ls_recover_list_lock);
+	return r;
+}
+
+void recover_list_clear(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r, *s;
+
+	spin_lock(&ls->ls_recover_list_lock);
+	list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
+		list_del_init(&r->res_recover_list);
+		dlm_put_rsb(r);
+		ls->ls_recover_list_count--;
+	}
+
+	if (ls->ls_recover_list_count != 0) {
+		log_error(ls, "warning: recover_list_count %d",
+			  ls->ls_recover_list_count);
+		ls->ls_recover_list_count = 0;
+	}
+	spin_unlock(&ls->ls_recover_list_lock);
+}
+
+
+/* Master recovery: find new master node for rsb's that were
+   mastered on nodes that have been removed.
+
+   dlm_recover_masters
+   recover_master
+   dlm_send_rcom_lookup            ->  receive_rcom_lookup
+                                       dlm_dir_lookup
+   receive_rcom_lookup_reply       <-
+   dlm_recover_master_reply
+   set_new_master
+   set_master_lkbs
+   set_lock_master
+*/
+
+/*
+ * Set the lock master for all LKBs in a lock queue
+ * If we are the new master of the rsb, we may have received new
+ * MSTCPY locks from other nodes already which we need to ignore
+ * when setting the new nodeid.
+ */
+
+static void set_lock_master(struct list_head *queue, int nodeid)
+{
+	struct dlm_lkb *lkb;
+
+	list_for_each_entry(lkb, queue, lkb_statequeue)
+		if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
+			lkb->lkb_nodeid = nodeid;
+}
+
+static void set_master_lkbs(struct dlm_rsb *r)
+{
+	set_lock_master(&r->res_grantqueue, r->res_nodeid);
+	set_lock_master(&r->res_convertqueue, r->res_nodeid);
+	set_lock_master(&r->res_waitqueue, r->res_nodeid);
+}
+
+/*
+ * Propogate the new master nodeid to locks
+ * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
+ * The NEW_MASTER2 flag tells recover_lvb() which rsb's to consider.
+ */
+
+static void set_new_master(struct dlm_rsb *r, int nodeid)
+{
+	lock_rsb(r);
+
+	if (nodeid == dlm_our_nodeid())
+		r->res_nodeid = 0;
+	else
+		r->res_nodeid = nodeid;
+
+	set_master_lkbs(r);
+
+	set_bit(RESFL_NEW_MASTER, &r->res_flags);
+	set_bit(RESFL_NEW_MASTER2, &r->res_flags);
+	unlock_rsb(r);
+}
+
+/*
+ * We do async lookups on rsb's that need new masters.  The rsb's
+ * waiting for a lookup reply are kept on the recover_list.
+ */
+
+static int recover_master(struct dlm_rsb *r)
+{
+	struct dlm_ls *ls = r->res_ls;
+	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
+
+	dir_nodeid = dlm_dir_nodeid(r);
+
+	if (dir_nodeid == our_nodeid) {
+		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
+				       r->res_length, &ret_nodeid);
+		if (error)
+			log_error(ls, "recover dir lookup error %d", error);
+
+		set_new_master(r, ret_nodeid);
+	} else {
+		recover_list_add(r);
+		error = dlm_send_rcom_lookup(r, dir_nodeid);
+	}
+
+	return error;
+}
+
+/*
+ * Go through local root resources and for each rsb which has a master which
+ * has departed, get the new master nodeid from the directory.  The dir will
+ * assign mastery to the first node to look up the new master.  That means
+ * we'll discover in this lookup if we're the new master of any rsb's.
+ *
+ * We fire off all the dir lookup requests individually and asynchronously to
+ * the correct dir node.
+ */
+
+int dlm_recover_masters(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r;
+	int error, count = 0;
+
+	log_debug(ls, "dlm_recover_masters");
+
+	down_read(&ls->ls_root_sem);
+	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+		if (is_master(r))
+			continue;
+
+		error = dlm_recovery_stopped(ls);
+		if (error) {
+			up_read(&ls->ls_root_sem);
+			goto out;
+		}
+
+		if (dlm_is_removed(ls, r->res_nodeid)) {
+			recover_master(r);
+			count++;
+		}
+
+		schedule();
+	}
+	up_read(&ls->ls_root_sem);
+
+	log_debug(ls, "dlm_recover_masters %d resources", count);
+
+	error = dlm_wait_function(ls, &recover_list_empty);
+ out:
+	if (error)
+		recover_list_clear(ls);
+	return error;
+}
+
+int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+{
+	struct dlm_rsb *r;
+
+	r = recover_list_find(ls, rc->rc_id);
+	if (!r) {
+		log_error(ls, "dlm_recover_master_reply no id %"PRIx64"",
+			  rc->rc_id);
+		goto out;
+	}
+
+	set_new_master(r, rc->rc_result);
+	recover_list_del(r);
+
+	if (recover_list_empty(ls))
+		wake_up(&ls->ls_wait_general);
+ out:
+	return 0;
+}
+
+
+/* Lock recovery: rebuild the process-copy locks we hold on a
+   remastered rsb on the new rsb master.
+
+   dlm_recover_locks
+   recover_locks
+   recover_locks_queue
+   dlm_send_rcom_lock              ->  receive_rcom_lock
+                                       dlm_recover_master_copy
+   receive_rcom_lock_reply         <-
+   dlm_recover_process_copy
+*/
+
+
+/*
+ * keep a count of the number of lkb's we send to the new master; when we get
+ * an equal number of replies then recovery for the rsb is done
+ */
+
+static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
+{
+	struct dlm_lkb *lkb;
+	int error = 0;
+
+	list_for_each_entry(lkb, head, lkb_statequeue) {
+	   	error = dlm_send_rcom_lock(r, lkb);
+		if (error)
+			break;
+		r->res_recover_locks_count++;
+	}
+
+	return error;
+}
+
+static int all_queues_empty(struct dlm_rsb *r)
+{
+	if (!list_empty(&r->res_grantqueue) ||
+	    !list_empty(&r->res_convertqueue) ||
+	    !list_empty(&r->res_waitqueue))
+		return FALSE;
+	return TRUE;
+}
+
+static int recover_locks(struct dlm_rsb *r)
+{
+	int error = 0;
+
+	lock_rsb(r);
+	if (all_queues_empty(r))
+		goto out;
+
+	DLM_ASSERT(!r->res_recover_locks_count, dlm_print_rsb(r););
+
+	error = recover_locks_queue(r, &r->res_grantqueue);
+	if (error)
+		goto out;
+	error = recover_locks_queue(r, &r->res_convertqueue);
+	if (error)
+		goto out;
+	error = recover_locks_queue(r, &r->res_waitqueue);
+	if (error)
+		goto out;
+
+	if (r->res_recover_locks_count)
+		recover_list_add(r);
+ out:
+	unlock_rsb(r);
+	return error;
+}
+
+int dlm_recover_locks(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r;
+	int error, count = 0;
+
+	log_debug(ls, "dlm_recover_locks");
+
+	down_read(&ls->ls_root_sem);
+	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+		if (is_master(r)) {
+			clear_bit(RESFL_NEW_MASTER, &r->res_flags);
+			continue;
+		}
+
+		if (!test_bit(RESFL_NEW_MASTER, &r->res_flags))
+			continue;
+
+		error = dlm_recovery_stopped(ls);
+		if (error) {
+			up_read(&ls->ls_root_sem);
+			goto out;
+		}
+
+		error = recover_locks(r);
+		if (error) {
+			up_read(&ls->ls_root_sem);
+			goto out;
+		}
+
+		count += r->res_recover_locks_count;
+	}
+	up_read(&ls->ls_root_sem);
+
+	log_debug(ls, "dlm_recover_locks %d locks", count);
+
+	error = dlm_wait_function(ls, &recover_list_empty);
+ out:
+	if (error)
+		recover_list_clear(ls);
+	else
+		set_bit(LSFL_LOCKS_VALID, &ls->ls_flags);
+	return error;
+}
+
+void dlm_recovered_lock(struct dlm_rsb *r)
+{
+	r->res_recover_locks_count--;
+	if (!r->res_recover_locks_count) {
+		clear_bit(RESFL_NEW_MASTER, &r->res_flags);
+		recover_list_del(r);
+	}
+
+	if (recover_list_empty(r->res_ls))
+		wake_up(&r->res_ls->ls_wait_general);
+}
+
+/*
+ * The lvb needs to be recovered on all master rsb's.  This includes setting
+ * the VALNOTVALID flag if necessary, and determining the correct lvb contents
+ * based on the lvb's of the locks held on the rsb.
+ *
+ * RESFL_VALNOTVALID is set if there are only NL/CR locks on the rsb.  If it
+ * was already set prior to recovery, it's not cleared, regardless of locks.
+ *
+ * The LVB contents are only considered for changing when this is a new master
+ * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
+ * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
+ * from the lkb with the largest lvb sequence number.
+ */
+
+static void recover_lvb(struct dlm_rsb *r)
+{
+	struct dlm_lkb *lkb, *high_lkb = NULL;
+	uint32_t high_seq = 0;
+	int lock_lvb_exists = FALSE;
+	int big_lock_exists = FALSE;
+	int lvblen = r->res_ls->ls_lvblen;
+
+	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
+		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+			continue;
+
+		lock_lvb_exists = TRUE;
+
+		if (lkb->lkb_grmode > DLM_LOCK_CR) {
+			big_lock_exists = TRUE;
+			goto setflag;
+		}
+
+		if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
+			high_lkb = lkb;
+			high_seq = lkb->lkb_lvbseq;
+		}
+	}
+
+	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
+		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+			continue;
+
+		lock_lvb_exists = TRUE;
+
+		if (lkb->lkb_grmode > DLM_LOCK_CR) {
+			big_lock_exists = TRUE;
+			goto setflag;
+		}
+
+		if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
+			high_lkb = lkb;
+			high_seq = lkb->lkb_lvbseq;
+		}
+	}
+
+ setflag:
+	if (!lock_lvb_exists)
+		goto out;
+
+	if (!big_lock_exists)
+		set_bit(RESFL_VALNOTVALID, &r->res_flags);
+
+	/* don't mess with the lvb unless we're the new master */
+	if (!test_bit(RESFL_NEW_MASTER2, &r->res_flags))
+		goto out;
+
+	if (!r->res_lvbptr) {
+		r->res_lvbptr = allocate_lvb(r->res_ls);
+		if (!r->res_lvbptr)
+			goto out;
+	}
+
+	if (big_lock_exists) {
+		r->res_lvbseq = lkb->lkb_lvbseq;
+		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen);
+	} else if (high_lkb) {
+		r->res_lvbseq = high_lkb->lkb_lvbseq;
+		memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
+	} else {
+		r->res_lvbseq = 0;
+		memset(r->res_lvbptr, 0, lvblen);
+	}
+ out:
+	return;
+}
+
+/* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
+   converting PR->CW or CW->PR need to have their lkb_grmode set. */
+
+static void recover_conversion(struct dlm_rsb *r)
+{
+	struct dlm_lkb *lkb;
+	int grmode = -1;
+
+	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
+		if (lkb->lkb_grmode == DLM_LOCK_PR ||
+		    lkb->lkb_grmode == DLM_LOCK_CW) {
+			grmode = lkb->lkb_grmode;
+			break;
+		}
+	}
+
+	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
+		if (lkb->lkb_grmode != DLM_LOCK_IV)
+			continue;
+		if (grmode == -1)
+			lkb->lkb_grmode = lkb->lkb_rqmode;
+		else
+			lkb->lkb_grmode = grmode;
+	}
+}
+
+void dlm_recover_rsbs(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r;
+	int count = 0;
+
+	log_debug(ls, "dlm_recover_rsbs");
+
+	down_read(&ls->ls_root_sem);
+	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+		lock_rsb(r);
+		if (is_master(r)) {
+			if (test_bit(RESFL_RECOVER_CONVERT, &r->res_flags))
+				recover_conversion(r);
+			recover_lvb(r);
+			count++;
+		}
+		clear_bit(RESFL_RECOVER_CONVERT, &r->res_flags);
+		unlock_rsb(r);
+	}
+	up_read(&ls->ls_root_sem);
+
+	log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
+}
+
+/* Create a single list of all root rsb's to be used during recovery */
+
+int dlm_create_root_list(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r;
+	int i, error = 0;
+
+	down_write(&ls->ls_root_sem);
+	if (!list_empty(&ls->ls_root_list)) {
+		log_error(ls, "root list not empty");
+		error = -EINVAL;
+		goto out;
+	}
+
+	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+		read_lock(&ls->ls_rsbtbl[i].lock);
+		list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+			list_add(&r->res_root_list, &ls->ls_root_list);
+			dlm_hold_rsb(r);
+		}
+		read_unlock(&ls->ls_rsbtbl[i].lock);
+	}
+ out:
+	up_write(&ls->ls_root_sem);
+	return error;
+}
+
+void dlm_release_root_list(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r, *safe;
+
+	down_write(&ls->ls_root_sem);
+	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
+		list_del_init(&r->res_root_list);
+		dlm_put_rsb(r);
+	}
+	up_write(&ls->ls_root_sem);
+}
+
+void dlm_clear_toss_list(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r, *safe;
+	int i;
+
+	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+		write_lock(&ls->ls_rsbtbl[i].lock);
+		list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
+					 res_hashchain) {
+			list_del(&r->res_hashchain);
+			free_rsb(r);
+		}
+		write_unlock(&ls->ls_rsbtbl[i].lock);
+	}
+}
+
--- a/drivers/dlm/recover.h	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/recover.h	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,32 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
+**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#ifndef __RECOVER_DOT_H__
+#define __RECOVER_DOT_H__
+
+int dlm_recovery_stopped(struct dlm_ls *ls);
+int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls));
+int dlm_recover_masters(struct dlm_ls *ls);
+int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc);
+int dlm_recover_locks(struct dlm_ls *ls);
+void dlm_recovered_lock(struct dlm_rsb *r);
+int dlm_create_root_list(struct dlm_ls *ls);
+void dlm_release_root_list(struct dlm_ls *ls);
+void dlm_clear_toss_list(struct dlm_ls *ls);
+void dlm_recover_rsbs(struct dlm_ls *ls);
+int dlm_recover_members_wait(struct dlm_ls *ls);
+int dlm_recover_directory_wait(struct dlm_ls *ls);
+int dlm_recover_locks_wait(struct dlm_ls *ls);
+
+#endif				/* __RECOVER_DOT_H__ */
+
--- a/drivers/dlm/recoverd.c	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/recoverd.c	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,714 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
+**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_internal.h"
+#include "lockspace.h"
+#include "member.h"
+#include "dir.h"
+#include "ast.h"
+#include "recover.h"
+#include "lowcomms.h"
+#include "lock.h"
+#include "requestqueue.h"
+
+/*
+ * next_move actions
+ */
+
+#define DO_STOP             (1)
+#define DO_START            (2)
+#define DO_FINISH           (3)
+#define DO_FINISH_STOP      (4)
+#define DO_FINISH_START     (5)
+
+static void set_start_done(struct dlm_ls *ls, int event_id)
+{
+	spin_lock(&ls->ls_recover_lock);
+	ls->ls_startdone = event_id;
+	spin_unlock(&ls->ls_recover_lock);
+
+	kobject_uevent(&ls->ls_kobj, KOBJ_CHANGE, NULL);
+}
+
+static int enable_locking(struct dlm_ls *ls, int event_id)
+{
+	int error = 0;
+
+	spin_lock(&ls->ls_recover_lock);
+	if (ls->ls_last_stop < event_id) {
+		set_bit(LSFL_LS_RUN, &ls->ls_flags);
+		up_write(&ls->ls_in_recovery);
+	} else {
+		error = -EINTR;
+		log_debug(ls, "enable_locking: abort %d", event_id);
+	}
+	spin_unlock(&ls->ls_recover_lock);
+	return error;
+}
+
+static int ls_first_start(struct dlm_ls *ls, struct dlm_recover *rv)
+{
+	int error;
+
+	log_debug(ls, "recover event %u (first)", rv->event_id);
+
+	down(&ls->ls_recoverd_active);
+
+	error = dlm_recover_members_first(ls, rv);
+	if (error) {
+		log_error(ls, "recover_members first failed %d", error);
+		goto out;
+	}
+
+	error = dlm_recover_directory(ls);
+	if (error) {
+		log_error(ls, "recover_directory failed %d", error);
+		goto out;
+	}
+
+	error = dlm_recover_directory_wait(ls);
+	if (error) {
+		log_error(ls, "recover_directory_wait failed %d", error);
+		goto out;
+	}
+
+	log_debug(ls, "recover event %u done", rv->event_id);
+	set_start_done(ls, rv->event_id);
+
+ out:
+	up(&ls->ls_recoverd_active);
+	return error;
+}
+
+/*
+ * We are given here a new group of nodes which are in the lockspace.  We first
+ * figure out the differences in ls membership from when we were last running.
+ * If nodes from before are gone, then there will be some lock recovery to do.
+ * If there are only nodes which have joined, then there's no lock recovery.
+ *
+ * Lockspace recovery for failed nodes must be completed before any nodes are
+ * allowed to join or leave the lockspace.
+ */
+
+static int ls_reconfig(struct dlm_ls *ls, struct dlm_recover *rv)
+{
+	unsigned long start;
+	int error, neg = 0;
+
+	log_debug(ls, "recover event %u", rv->event_id);
+
+	down(&ls->ls_recoverd_active);
+
+	/*
+	 * Suspending and resuming dlm_astd ensures that no lkb's from this ls
+	 * will be processed by dlm_astd during recovery.
+	 */
+
+	dlm_astd_suspend();
+	dlm_astd_resume();
+
+	/*
+	 * This list of root rsb's will be the basis of most of the recovery
+	 * routines.
+	 */
+
+	dlm_create_root_list(ls);
+
+	/*
+	 * Free all the tossed rsb's so we don't have to recover them.
+	 */
+
+	dlm_clear_toss_list(ls);
+
+	/*
+	 * Add or remove nodes from the lockspace's ls_nodes list.
+	 * Also waits for all nodes to complete dlm_recover_members.
+	 */
+
+	error = dlm_recover_members(ls, rv, &neg);
+	if (error) {
+		log_error(ls, "recover_members failed %d", error);
+		goto fail;
+	}
+	start = jiffies;
+
+	/*
+	 * Rebuild our own share of the directory by collecting from all other
+	 * nodes their master rsb names that hash to us.
+	 */
+
+	error = dlm_recover_directory(ls);
+	if (error) {
+		log_error(ls, "recover_directory failed %d", error);
+		goto fail;
+	}
+
+	/*
+	 * Purge directory-related requests that are saved in requestqueue.
+	 * All dir requests from before recovery are invalid now due to the dir
+	 * rebuild and will be resent by the requesting nodes.
+	 */
+
+	dlm_purge_requestqueue(ls);
+
+	/*
+	 * Wait for all nodes to complete directory rebuild.
+	 */
+
+	error = dlm_recover_directory_wait(ls);
+	if (error) {
+		log_error(ls, "recover_directory_wait failed %d", error);
+		goto fail;
+	}
+
+	/*
+	 * We may have outstanding operations that are waiting for a reply from
+	 * a failed node.  Mark these to be resent after recovery.  Unlock and
+	 * cancel ops can just be completed.
+	 */
+
+	dlm_recover_waiters_pre(ls);
+
+	error = dlm_recovery_stopped(ls);
+	if (error)
+		goto fail;
+
+	if (neg) {
+		/*
+		 * Clear lkb's for departed nodes.
+		 */
+
+		dlm_purge_locks(ls);
+
+		/*
+		 * Get new master nodeid's for rsb's that were mastered on
+		 * departed nodes.
+		 */
+
+		error = dlm_recover_masters(ls);
+		if (error) {
+			log_error(ls, "recover_masters failed %d", error);
+			goto fail;
+		}
+
+		/*
+		 * Send our locks on remastered rsb's to the new masters.
+		 */
+
+		error = dlm_recover_locks(ls);
+		if (error) {
+			log_error(ls, "recover_locks failed %d", error);
+			goto fail;
+		}
+
+		error = dlm_recover_locks_wait(ls);
+		if (error) {
+			log_error(ls, "recover_locks_wait failed %d", error);
+			goto fail;
+		}
+
+		/*
+		 * Finalize state in master rsb's now that all locks can be
+		 * checked.  This includes conversion resolution and lvb
+		 * settings.
+		 */
+
+		dlm_recover_rsbs(ls);
+	}
+
+	dlm_release_root_list(ls);
+
+	log_debug(ls, "recover event %u done: %u ms", rv->event_id,
+		  jiffies_to_msecs(jiffies - start));
+	set_start_done(ls, rv->event_id);
+	up(&ls->ls_recoverd_active);
+
+	return 0;
+
+ fail:
+	log_debug(ls, "recover event %d error %d", rv->event_id, error);
+	up(&ls->ls_recoverd_active);
+	return error;
+}
+
+/*
+ * Between calls to this routine for a ls, there can be multiple stop/start
+ * events where every start but the latest is cancelled by stops.  There can
+ * only be a single finish because every finish requires us to call start_done.
+ * A single finish event could be followed by multiple stop/start events.  This
+ * routine takes any combination of events and boils them down to one course of
+ * action.
+ */
+
+static int next_move(struct dlm_ls *ls, struct dlm_recover **rv_out,
+		     int *finish_out)
+{
+	LIST_HEAD(events);
+	unsigned int cmd = 0, stop, start, finish;
+	unsigned int last_stop, last_start, last_finish;
+	struct dlm_recover *rv = NULL, *start_rv = NULL;
+
+	spin_lock(&ls->ls_recover_lock);
+
+	stop = test_and_clear_bit(LSFL_LS_STOP, &ls->ls_flags) ? 1 : 0;
+	start = test_and_clear_bit(LSFL_LS_START, &ls->ls_flags) ? 1 : 0;
+	finish = test_and_clear_bit(LSFL_LS_FINISH, &ls->ls_flags) ? 1 : 0;
+
+	last_stop = ls->ls_last_stop;
+	last_start = ls->ls_last_start;
+	last_finish = ls->ls_last_finish;
+
+	while (!list_empty(&ls->ls_recover)) {
+		rv = list_entry(ls->ls_recover.next, struct dlm_recover, list);
+		list_del(&rv->list);
+		list_add_tail(&rv->list, &events);
+	}
+
+	/*
+	 * There are two cases where we need to adjust these event values:
+	 * 1. - we get a first start
+	 *    - we get a stop
+	 *    - we process the start + stop here and notice this special case
+	 *
+	 * 2. - we get a first start
+	 *    - we process the start
+	 *    - we get a stop
+	 *    - we process the stop here and notice this special case
+	 *
+	 * In both cases, the first start we received was aborted by a
+	 * stop before we received a finish.  last_finish being zero is the
+	 * indication that this is the "first" start, i.e. we've not yet
+	 * finished a start; if we had, last_finish would be non-zero.
+	 * Part of the problem arises from the fact that when we initially
+	 * get start/stop/start, we may get the same event id for both starts
+	 * (since the first was cancelled).
+	 *
+	 * In both cases, last_start and last_stop will be equal.
+	 * In both cases, finish=0.
+	 * In the first case start=1 && stop=1.
+	 * In the second case start=0 && stop=1.
+	 *
+	 * In both cases, we need to make adjustments to values so:
+	 * - we process the current event (now) as a normal stop
+	 * - the next start we receive will be processed normally
+	 *   (taking into account the assertions below)
+	 *
+	 * In the first case, dlm_ls_start() will have printed the
+	 * "repeated start" warning.
+	 *
+	 * In the first case we need to get rid of the recover event struct.
+	 *
+	 * - set stop=1, start=0, finish=0 for case 4 below
+	 * - last_stop and last_start must be set equal per the case 4 assert
+	 * - ls_last_stop = 0 so the next start will be larger
+	 * - ls_last_start = 0 not really necessary (avoids dlm_ls_start print)
+	 */
+
+	if (!last_finish && (last_start == last_stop)) {
+		log_debug(ls, "move reset %u,%u,%u ids %u,%u,%u", stop,
+			  start, finish, last_stop, last_start, last_finish);
+		stop = 1;
+		start = 0;
+		finish = 0;
+		last_stop = 0;
+		last_start = 0;
+		ls->ls_last_stop = 0;
+		ls->ls_last_start = 0;
+
+		while (!list_empty(&events)) {
+			rv = list_entry(events.next, struct dlm_recover, list);
+			list_del(&rv->list);
+			kfree(rv->nodeids);
+			kfree(rv);
+		}
+	}
+	spin_unlock(&ls->ls_recover_lock);
+
+	log_debug(ls, "move flags %u,%u,%u ids %u,%u,%u", stop, start, finish,
+		  last_stop, last_start, last_finish);
+
+	/*
+	 * Toss start events which have since been cancelled.
+	 */
+
+	while (!list_empty(&events)) {
+		DLM_ASSERT(start,);
+		rv = list_entry(events.next, struct dlm_recover, list);
+		list_del(&rv->list);
+
+		if (rv->event_id <= last_stop) {
+			log_debug(ls, "move skip event %u", rv->event_id);
+			kfree(rv->nodeids);
+			kfree(rv);
+			rv = NULL;
+		} else {
+			log_debug(ls, "move use event %u", rv->event_id);
+			DLM_ASSERT(!start_rv,);
+			start_rv = rv;
+		}
+	}
+
+	/*
+	 * Eight possible combinations of events.
+	 */
+
+	/* 0 */
+	if (!stop && !start && !finish) {
+		DLM_ASSERT(!start_rv,);
+		cmd = 0;
+		goto out;
+	}
+
+	/* 1 */
+	if (!stop && !start && finish) {
+		DLM_ASSERT(!start_rv,);
+		DLM_ASSERT(last_start > last_stop,);
+		DLM_ASSERT(last_finish == last_start,);
+		cmd = DO_FINISH;
+		*finish_out = last_finish;
+		goto out;
+	}
+
+	/* 2 */
+	if (!stop && start && !finish) {
+		DLM_ASSERT(start_rv,);
+		DLM_ASSERT(last_start > last_stop,);
+		cmd = DO_START;
+		*rv_out = start_rv;
+		goto out;
+	}
+
+	/* 3 */
+	if (!stop && start && finish) {
+		DLM_ASSERT(0, printk("finish and start with no stop\n"););
+	}
+
+	/* 4 */
+	if (stop && !start && !finish) {
+		DLM_ASSERT(!start_rv,);
+		DLM_ASSERT(last_start == last_stop,);
+		cmd = DO_STOP;
+		goto out;
+	}
+
+	/* 5 */
+	if (stop && !start && finish) {
+		DLM_ASSERT(!start_rv,);
+		DLM_ASSERT(last_finish == last_start,);
+		DLM_ASSERT(last_stop == last_start,);
+		cmd = DO_FINISH_STOP;
+		*finish_out = last_finish;
+		goto out;
+	}
+
+	/* 6 */
+	if (stop && start && !finish) {
+		if (start_rv) {
+			DLM_ASSERT(last_start > last_stop,);
+			cmd = DO_START;
+			*rv_out = start_rv;
+		} else {
+			DLM_ASSERT(last_stop == last_start,);
+			cmd = DO_STOP;
+		}
+		goto out;
+	}
+
+	/* 7 */
+	if (stop && start && finish) {
+		if (start_rv) {
+			DLM_ASSERT(last_start > last_stop,);
+			DLM_ASSERT(last_start > last_finish,);
+			cmd = DO_FINISH_START;
+			*finish_out = last_finish;
+			*rv_out = start_rv;
+		} else {
+			DLM_ASSERT(last_start == last_stop,);
+			DLM_ASSERT(last_start > last_finish,);
+			cmd = DO_FINISH_STOP;
+			*finish_out = last_finish;
+		}
+		goto out;
+	}
+
+ out:
+	return cmd;
+}
+
+/*
+ * This function decides what to do given every combination of current
+ * lockspace state and next lockspace state.
+ */
+
+static void do_ls_recovery(struct dlm_ls *ls)
+{
+	struct dlm_recover *rv = NULL;
+	int error, cur_state, next_state = 0, do_now, finish_event = 0;
+
+	do_now = next_move(ls, &rv, &finish_event);
+	if (!do_now)
+		goto out;
+
+	cur_state = ls->ls_state;
+	next_state = 0;
+
+	DLM_ASSERT(!test_bit(LSFL_LS_RUN, &ls->ls_flags),
+		    log_error(ls, "curstate=%d donow=%d", cur_state, do_now););
+
+	/*
+	 * LSST_CLEAR - we're not in any recovery state.  We can get a stop or
+	 * a stop and start which equates with a START.
+	 */
+
+	if (cur_state == LSST_CLEAR) {
+		switch (do_now) {
+		case DO_STOP:
+			next_state = LSST_WAIT_START;
+			break;
+
+		case DO_START:
+			error = ls_reconfig(ls, rv);
+			if (error)
+				next_state = LSST_WAIT_START;
+			else
+				next_state = LSST_RECONFIG_DONE;
+			break;
+
+		case DO_FINISH:	/* invalid */
+		case DO_FINISH_STOP:	/* invalid */
+		case DO_FINISH_START:	/* invalid */
+		default:
+			DLM_ASSERT(0,);
+		}
+		goto out;
+	}
+
+	/*
+	 * LSST_WAIT_START - we're not running because of getting a stop or
+	 * failing a start.  We wait in this state for another stop/start or
+	 * just the next start to begin another reconfig attempt.
+	 */
+
+	if (cur_state == LSST_WAIT_START) {
+		switch (do_now) {
+		case DO_STOP:
+			break;
+
+		case DO_START:
+			error = ls_reconfig(ls, rv);
+			if (error)
+				next_state = LSST_WAIT_START;
+			else
+				next_state = LSST_RECONFIG_DONE;
+			break;
+
+		case DO_FINISH:	/* invalid */
+		case DO_FINISH_STOP:	/* invalid */
+		case DO_FINISH_START:	/* invalid */
+		default:
+			DLM_ASSERT(0,);
+		}
+		goto out;
+	}
+
+	/*
+	 * LSST_RECONFIG_DONE - we entered this state after successfully
+	 * completing ls_reconfig and calling set_start_done.  We expect to get
+	 * a finish if everything goes ok.  A finish could be followed by stop
+	 * or stop/start before we get here to check it.  Or a finish may never
+	 * happen, only stop or stop/start.
+	 */
+
+	if (cur_state == LSST_RECONFIG_DONE) {
+		switch (do_now) {
+		case DO_FINISH:
+			dlm_clear_members_finish(ls, finish_event);
+			next_state = LSST_CLEAR;
+
+			error = enable_locking(ls, finish_event);
+			if (error)
+				break;
+
+			error = dlm_process_requestqueue(ls);
+			if (error)
+				break;
+
+			error = dlm_recover_waiters_post(ls);
+			if (error)
+				break;
+
+			dlm_grant_after_purge(ls);
+
+			dlm_astd_wake();
+
+			log_debug(ls, "recover event %u finished", finish_event);
+			break;
+
+		case DO_STOP:
+			next_state = LSST_WAIT_START;
+			break;
+
+		case DO_FINISH_STOP:
+			dlm_clear_members_finish(ls, finish_event);
+			next_state = LSST_WAIT_START;
+			break;
+
+		case DO_FINISH_START:
+			dlm_clear_members_finish(ls, finish_event);
+			/* fall into DO_START */
+
+		case DO_START:
+			error = ls_reconfig(ls, rv);
+			if (error)
+				next_state = LSST_WAIT_START;
+			else
+				next_state = LSST_RECONFIG_DONE;
+			break;
+
+		default:
+			DLM_ASSERT(0,);
+		}
+		goto out;
+	}
+
+	/*
+	 * LSST_INIT - state after ls is created and before it has been
+	 * started.  A start operation will cause the ls to be started for the
+	 * first time.  A failed start will cause to just wait in INIT for
+	 * another stop/start.
+	 */
+
+	if (cur_state == LSST_INIT) {
+		switch (do_now) {
+		case DO_START:
+			error = ls_first_start(ls, rv);
+			if (!error)
+				next_state = LSST_INIT_DONE;
+			break;
+
+		case DO_STOP:
+			break;
+
+		case DO_FINISH:	/* invalid */
+		case DO_FINISH_STOP:	/* invalid */
+		case DO_FINISH_START:	/* invalid */
+		default:
+			DLM_ASSERT(0,);
+		}
+		goto out;
+	}
+
+	/*
+	 * LSST_INIT_DONE - after the first start operation is completed
+	 * successfully and set_start_done() called.  If there are no errors, a
+	 * finish will arrive next and we'll move to LSST_CLEAR.
+	 */
+
+	if (cur_state == LSST_INIT_DONE) {
+		switch (do_now) {
+		case DO_STOP:
+		case DO_FINISH_STOP:
+			next_state = LSST_WAIT_START;
+			break;
+
+		case DO_START:
+		case DO_FINISH_START:
+			error = ls_reconfig(ls, rv);
+			if (error)
+				next_state = LSST_WAIT_START;
+			else
+				next_state = LSST_RECONFIG_DONE;
+			break;
+
+		case DO_FINISH:
+			next_state = LSST_CLEAR;
+
+			enable_locking(ls, finish_event);
+
+			dlm_process_requestqueue(ls);
+
+			dlm_astd_wake();
+
+			log_debug(ls, "recover event %u finished", finish_event);
+			break;
+
+		default:
+			DLM_ASSERT(0,);
+		}
+		goto out;
+	}
+
+ out:
+	if (next_state)
+		ls->ls_state = next_state;
+
+	if (rv) {
+		kfree(rv->nodeids);
+		kfree(rv);
+	}
+}
+
+int dlm_recoverd(void *arg)
+{
+	struct dlm_ls *ls;
+
+	ls = dlm_find_lockspace_local(arg);
+
+	while (!kthread_should_stop()) {
+		set_current_state(TASK_INTERRUPTIBLE);
+		if (!test_bit(LSFL_WORK, &ls->ls_flags))
+			schedule();
+		set_current_state(TASK_RUNNING);
+
+		if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags))
+			do_ls_recovery(ls);
+	}
+
+	dlm_put_lockspace(ls);
+	return 0;
+}
+
+void dlm_recoverd_kick(struct dlm_ls *ls)
+{
+	set_bit(LSFL_WORK, &ls->ls_flags);
+	wake_up_process(ls->ls_recoverd_task);
+}
+
+int dlm_recoverd_start(struct dlm_ls *ls)
+{
+	struct task_struct *p;
+	int error = 0;
+
+	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
+	if (IS_ERR(p))
+		error = PTR_ERR(p);
+	else
+                ls->ls_recoverd_task = p;
+	return error;
+}
+
+void dlm_recoverd_stop(struct dlm_ls *ls)
+{
+	kthread_stop(ls->ls_recoverd_task);
+}
+
+void dlm_recoverd_suspend(struct dlm_ls *ls)
+{
+	down(&ls->ls_recoverd_active);
+}
+
+void dlm_recoverd_resume(struct dlm_ls *ls)
+{
+	up(&ls->ls_recoverd_active);
+}
+
--- a/drivers/dlm/recoverd.h	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/recoverd.h	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,24 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
+**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#ifndef __RECOVERD_DOT_H__
+#define __RECOVERD_DOT_H__
+
+void dlm_recoverd_kick(struct dlm_ls *ls);
+void dlm_recoverd_stop(struct dlm_ls *ls);
+int dlm_recoverd_start(struct dlm_ls *ls);
+void dlm_recoverd_suspend(struct dlm_ls *ls);
+void dlm_recoverd_resume(struct dlm_ls *ls);
+
+#endif				/* __RECOVERD_DOT_H__ */
+
--- a/drivers/dlm/requestqueue.c	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/requestqueue.c	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,144 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_internal.h"
+#include "member.h"
+#include "lock.h"
+
+struct rq_entry {
+	struct list_head list;
+	int nodeid;
+	char request[1];
+};
+
+/*
+ * Requests received while the lockspace is in recovery get added to the
+ * request queue and processed when recovery is complete.  This happens when
+ * the lockspace is suspended on some nodes before it is on others, or the
+ * lockspace is enabled on some while still suspended on others.
+ */
+
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
+{
+	struct rq_entry *e;
+	int length = hd->h_length;
+
+	if (dlm_is_removed(ls, nodeid))
+		return;
+
+	e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
+	if (!e) {
+		log_print("dlm_add_requestqueue: out of memory\n");
+		return;
+	}
+
+	e->nodeid = nodeid;
+	memcpy(e->request, hd, length);
+
+	down(&ls->ls_requestqueue_lock);
+	list_add_tail(&e->list, &ls->ls_requestqueue);
+	up(&ls->ls_requestqueue_lock);
+}
+
+int dlm_process_requestqueue(struct dlm_ls *ls)
+{
+	struct rq_entry *e;
+	struct dlm_header *hd;
+	int error = 0;
+
+	down(&ls->ls_requestqueue_lock);
+
+	for (;;) {
+		if (list_empty(&ls->ls_requestqueue)) {
+			up(&ls->ls_requestqueue_lock);
+			error = 0;
+			break;
+		}
+		e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
+		up(&ls->ls_requestqueue_lock);
+
+		hd = (struct dlm_header *) e->request;
+		error = dlm_receive_message(hd, e->nodeid, TRUE);
+
+		if (error == -EINTR) {
+			/* entry is left on requestqueue */
+			log_debug(ls, "process_requestqueue abort eintr");
+			break;
+		}
+
+		down(&ls->ls_requestqueue_lock);
+		list_del(&e->list);
+		kfree(e);
+
+		if (!test_bit(LSFL_LS_RUN, &ls->ls_flags)) {
+			log_debug(ls, "process_requestqueue abort ls_run");
+			up(&ls->ls_requestqueue_lock);
+			error = -EINTR;
+			break;
+		}
+		schedule();
+	}
+
+	return error;
+}
+
+/*
+ * After recovery is done, locking is resumed and dlm_recoverd takes all the
+ * saved requests and processes them as they would have been by dlm_recvd.  At
+ * the same time, dlm_recvd will start receiving new requests from remote
+ * nodes.  We want to delay dlm_recvd processing new requests until
+ * dlm_recoverd has finished processing the old saved requests.
+ */
+
+void dlm_wait_requestqueue(struct dlm_ls *ls)
+{
+	for (;;) {
+		down(&ls->ls_requestqueue_lock);
+		if (list_empty(&ls->ls_requestqueue))
+			break;
+		if (!test_bit(LSFL_LS_RUN, &ls->ls_flags))
+			break;
+		up(&ls->ls_requestqueue_lock);
+		schedule();
+	}
+	up(&ls->ls_requestqueue_lock);
+}
+
+/*
+ * Dir lookups and lookup replies send before recovery are invalid because the
+ * directory is rebuilt during recovery, so don't save any requests of this
+ * type.  Don't save any requests from a node that's being removed either.
+ */
+
+void dlm_purge_requestqueue(struct dlm_ls *ls)
+{
+	struct dlm_message *ms;
+	struct rq_entry *e, *safe;
+	uint32_t mstype;
+
+	down(&ls->ls_requestqueue_lock);
+	list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
+
+		ms = (struct dlm_message *) e->request;
+		mstype = ms->m_type;
+
+		if (dlm_is_removed(ls, e->nodeid) ||
+		    mstype == DLM_MSG_REMOVE ||
+	            mstype == DLM_MSG_LOOKUP ||
+	            mstype == DLM_MSG_LOOKUP_REPLY) {
+			list_del(&e->list);
+			kfree(e);
+		}
+	}
+	up(&ls->ls_requestqueue_lock);
+}
+
--- a/drivers/dlm/requestqueue.h	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/requestqueue.h	2005-05-12 23:13:15.830485360 +0800
@@ -0,0 +1,22 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#ifndef __REQUESTQUEUE_DOT_H__
+#define __REQUESTQUEUE_DOT_H__
+
+void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
+int dlm_process_requestqueue(struct dlm_ls *ls);
+void dlm_wait_requestqueue(struct dlm_ls *ls);
+void dlm_purge_requestqueue(struct dlm_ls *ls);
+
+#endif
+
--- a/drivers/dlm/member.c	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/member.c	2005-05-12 23:13:15.829485512 +0800
@@ -0,0 +1,356 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_internal.h"
+#include "member_sysfs.h"
+#include "lockspace.h"
+#include "member.h"
+#include "recoverd.h"
+#include "recover.h"
+#include "lowcomms.h"
+#include "rcom.h"
+
+/*
+ * Following called by dlm_recoverd thread
+ */
+
+static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
+{
+	struct dlm_member *memb = NULL;
+	struct list_head *tmp;
+	struct list_head *newlist = &new->list;
+	struct list_head *head = &ls->ls_nodes;
+
+	list_for_each(tmp, head) {
+		memb = list_entry(tmp, struct dlm_member, list);
+		if (new->nodeid < memb->nodeid)
+			break;
+	}
+
+	if (!memb)
+		list_add_tail(newlist, head);
+	else {
+		/* FIXME: can use list macro here */
+		newlist->prev = tmp->prev;
+		newlist->next = tmp;
+		tmp->prev->next = newlist;
+		tmp->prev = newlist;
+	}
+}
+
+int dlm_add_member(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_member *memb;
+
+	memb = kmalloc(sizeof(struct dlm_member), GFP_KERNEL);
+	if (!memb)
+		return -ENOMEM;
+
+	memb->nodeid = nodeid;
+	add_ordered_member(ls, memb);
+	ls->ls_num_nodes++;
+	return 0;
+}
+
+void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
+{
+	list_move(&memb->list, &ls->ls_nodes_gone);
+	ls->ls_num_nodes--;
+}
+
+int dlm_is_member(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_member *memb;
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (memb->nodeid == nodeid)
+			return TRUE;
+	}
+	return FALSE;
+}
+
+int dlm_is_removed(struct dlm_ls *ls, int nodeid)
+{
+	struct dlm_member *memb;
+
+	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+		if (memb->nodeid == nodeid)
+			return TRUE;
+	}
+	return FALSE;
+}
+
+static void clear_memb_list(struct list_head *head)
+{
+	struct dlm_member *memb;
+
+	while (!list_empty(head)) {
+		memb = list_entry(head->next, struct dlm_member, list);
+		list_del(&memb->list);
+		kfree(memb);
+	}
+}
+
+void dlm_clear_members(struct dlm_ls *ls)
+{
+	clear_memb_list(&ls->ls_nodes);
+	ls->ls_num_nodes = 0;
+}
+
+void dlm_clear_members_gone(struct dlm_ls *ls)
+{
+	clear_memb_list(&ls->ls_nodes_gone);
+}
+
+void dlm_clear_members_finish(struct dlm_ls *ls, int finish_event)
+{
+	struct dlm_member *memb, *safe;
+
+	list_for_each_entry_safe(memb, safe, &ls->ls_nodes_gone, list) {
+		if (memb->gone_event <= finish_event) {
+			list_del(&memb->list);
+			kfree(memb);
+		}
+	}
+}
+
+static void make_member_array(struct dlm_ls *ls)
+{
+	struct dlm_member *memb;
+	int i = 0, *array;
+
+	if (ls->ls_node_array) {
+		kfree(ls->ls_node_array);
+		ls->ls_node_array = NULL;
+	}
+
+	array = kmalloc(sizeof(int) * ls->ls_num_nodes, GFP_KERNEL);
+	if (!array)
+		return;
+
+	list_for_each_entry(memb, &ls->ls_nodes, list)
+		array[i++] = memb->nodeid;
+
+	ls->ls_node_array = array;
+}
+
+/* send a status request to all members just to establish comms connections */
+
+static void ping_members(struct dlm_ls *ls)
+{
+	struct dlm_member *memb;
+	list_for_each_entry(memb, &ls->ls_nodes, list)
+		dlm_rcom_status(ls, memb->nodeid);
+}
+
+int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
+{
+	struct dlm_member *memb, *safe;
+	int i, error, found, pos = 0, neg = 0, low = -1;
+
+	/* move departed members from ls_nodes to ls_nodes_gone */
+
+	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
+		found = FALSE;
+		for (i = 0; i < rv->node_count; i++) {
+			if (memb->nodeid == rv->nodeids[i]) {
+				found = TRUE;
+				break;
+			}
+		}
+
+		if (!found) {
+			neg++;
+			memb->gone_event = rv->event_id;
+			dlm_remove_member(ls, memb);
+			log_debug(ls, "remove member %d", memb->nodeid);
+		}
+	}
+
+	/* add new members to ls_nodes */
+
+	for (i = 0; i < rv->node_count; i++) {
+		if (dlm_is_member(ls, rv->nodeids[i]))
+			continue;
+		dlm_add_member(ls, rv->nodeids[i]);
+		pos++;
+		log_debug(ls, "add member %d", rv->nodeids[i]);
+	}
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (low == -1 || memb->nodeid < low)
+			low = memb->nodeid;
+	}
+	ls->ls_low_nodeid = low;
+
+	make_member_array(ls);
+	set_bit(LSFL_NODES_VALID, &ls->ls_flags);
+	*neg_out = neg;
+
+	ping_members(ls);
+
+	error = dlm_recover_members_wait(ls);
+	log_debug(ls, "total members %d", ls->ls_num_nodes);
+	return error;
+}
+
+int dlm_recover_members_first(struct dlm_ls *ls, struct dlm_recover *rv)
+{
+	int i, error, nodeid, low = -1;
+
+	dlm_clear_members(ls);
+
+	log_debug(ls, "add members");
+
+	for (i = 0; i < rv->node_count; i++) {
+		nodeid = rv->nodeids[i];
+		dlm_add_member(ls, nodeid);
+
+		if (low == -1 || nodeid < low)
+			low = nodeid;
+	}
+	ls->ls_low_nodeid = low;
+
+	make_member_array(ls);
+	set_bit(LSFL_NODES_VALID, &ls->ls_flags);
+
+	ping_members(ls);
+
+	error = dlm_recover_members_wait(ls);
+	log_debug(ls, "total members %d", ls->ls_num_nodes);
+	return error;
+}
+
+/*
+ * Following called from member_sysfs.c
+ */
+
+int dlm_ls_terminate(struct dlm_ls *ls)
+{
+	spin_lock(&ls->ls_recover_lock);
+	set_bit(LSFL_LS_TERMINATE, &ls->ls_flags);
+	set_bit(LSFL_JOIN_DONE, &ls->ls_flags);
+	set_bit(LSFL_LEAVE_DONE, &ls->ls_flags);
+	spin_unlock(&ls->ls_recover_lock);
+	wake_up(&ls->ls_wait_member);
+	log_error(ls, "dlm_ls_terminate");
+	return 0;
+}
+
+int dlm_ls_stop(struct dlm_ls *ls)
+{
+	int new;
+
+	spin_lock(&ls->ls_recover_lock);
+	ls->ls_last_stop = ls->ls_last_start;
+	set_bit(LSFL_LS_STOP, &ls->ls_flags);
+	new = test_and_clear_bit(LSFL_LS_RUN, &ls->ls_flags);
+	spin_unlock(&ls->ls_recover_lock);
+
+	/*
+	 * This in_recovery lock does two things:
+	 *
+	 * 1) Keeps this function from returning until all threads are out
+	 *    of locking routines and locking is truely stopped.
+	 * 2) Keeps any new requests from being processed until it's unlocked
+	 *    when recovery is complete.
+	 */
+
+	if (new)
+		down_write(&ls->ls_in_recovery);
+
+	/*
+	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
+	 * running) has noticed the clearing of LS_RUN above and quit
+	 * processing the previous recovery.  This will be true for all nodes
+	 * before any nodes get the start.
+	 */
+
+	dlm_recoverd_suspend(ls);
+	clear_bit(LSFL_DIR_VALID, &ls->ls_flags);
+	clear_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags);
+	clear_bit(LSFL_NODES_VALID, &ls->ls_flags);
+	clear_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags);
+	dlm_recoverd_resume(ls);
+	dlm_recoverd_kick(ls);
+	return 0;
+}
+
+int dlm_ls_start(struct dlm_ls *ls, int event_nr)
+{
+	struct dlm_recover *rv;
+	int error = 0;
+
+	rv = kmalloc(sizeof(struct dlm_recover), GFP_KERNEL);
+	if (!rv)
+		return -ENOMEM;
+	memset(rv, 0, sizeof(struct dlm_recover));
+
+	spin_lock(&ls->ls_recover_lock);
+
+	if (test_bit(LSFL_LS_RUN, &ls->ls_flags)) {
+		spin_unlock(&ls->ls_recover_lock);
+		log_error(ls, "start ignored: lockspace running");
+		kfree(rv);
+		error = -EINVAL;
+		goto out;
+	}
+
+	if (!ls->ls_nodeids_next) {
+		spin_unlock(&ls->ls_recover_lock);
+		log_error(ls, "start ignored: existing nodeids_next");
+		kfree(rv);
+		error = -EINVAL;
+		goto out;
+	}
+
+	if (event_nr <= ls->ls_last_start) {
+		spin_unlock(&ls->ls_recover_lock);
+		log_error(ls, "start event_nr %d not greater than last %d",
+			  event_nr, ls->ls_last_start);
+		kfree(rv);
+		error = -EINVAL;
+		goto out;
+	}
+
+	rv->nodeids = ls->ls_nodeids_next;
+	ls->ls_nodeids_next = NULL;
+	rv->node_count = ls->ls_nodeids_next_count;
+	rv->event_id = event_nr;
+	ls->ls_last_start = event_nr;
+	list_add_tail(&rv->list, &ls->ls_recover);
+	set_bit(LSFL_LS_START, &ls->ls_flags);
+	spin_unlock(&ls->ls_recover_lock);
+
+	set_bit(LSFL_JOIN_DONE, &ls->ls_flags);
+	wake_up(&ls->ls_wait_member);
+	dlm_recoverd_kick(ls);
+ out:
+	return error;
+}
+
+int dlm_ls_finish(struct dlm_ls *ls, int event_nr)
+{
+	spin_lock(&ls->ls_recover_lock);
+	if (event_nr != ls->ls_last_start) {
+		spin_unlock(&ls->ls_recover_lock);
+		log_error(ls, "finish event_nr %d doesn't match start %d",
+			  event_nr, ls->ls_last_start);
+		return -EINVAL;
+	}
+	ls->ls_last_finish = event_nr;
+	set_bit(LSFL_LS_FINISH, &ls->ls_flags);
+	spin_unlock(&ls->ls_recover_lock);
+	dlm_recoverd_kick(ls);
+	return 0;
+}
+
--- a/drivers/dlm/member.h	1970-01-01 07:30:00.000000000 +0730
+++ b/drivers/dlm/member.h	2005-05-12 23:13:15.829485512 +0800
@@ -0,0 +1,29 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#ifndef __MEMBER_DOT_H__
+#define __MEMBER_DOT_H__
+
+int dlm_ls_terminate(struct dlm_ls *ls);
+int dlm_ls_stop(struct dlm_ls *ls);
+int dlm_ls_start(struct dlm_ls *ls, int event_nr);
+int dlm_ls_finish(struct dlm_ls *ls, int event_nr);
+
+void dlm_clear_members(struct dlm_ls *ls);
+void dlm_clear_members_gone(struct dlm_ls *ls);
+void dlm_clear_members_finish(struct dlm_ls *ls, int finish_event);
+int dlm_recover_members_first(struct dlm_ls *ls, struct dlm_recover *rv);
+int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
+int dlm_is_removed(struct dlm_ls *ls, int nodeid);
+
+#endif                          /* __MEMBER_DOT_H__ */
+


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]