[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] Cluster Project branch, master, updated. gfs-kernel_0_1_22-158-g77bce77



This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "Cluster Project".

http://sources.redhat.com/git/gitweb.cgi?p=cluster.git;a=commitdiff;h=77bce77b5034adf8f00090b13dde7c7d481b0dd9

The branch, master has been updated
       via  77bce77b5034adf8f00090b13dde7c7d481b0dd9 (commit)
      from  d2d49ff4c65cb6c7912c20a4a55b1d7e60cd3a85 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 77bce77b5034adf8f00090b13dde7c7d481b0dd9
Author: David Teigland <teigland redhat com>
Date:   Wed Mar 19 16:05:20 2008 -0500

    dlm_controld: new version
    
    - uses libcpg directly without libgroup (use the -g0 option)
    - takes over plock handling from gfs_controld
    - interacts with fenced and fs_controld to coordinate recovery (todo)
    - runs in backward compat mode by default, using libgroup to interact
      with old groupd/dlm_controld (-g1 option)
    - plan to add a new default -g2 option that will detect old groupd's in
      the cluster and only run in old mode if any exist
    
    Signed-off-by: David Teigland <teigland redhat com>

-----------------------------------------------------------------------

Summary of changes:
 group/dlm_controld/Makefile                        |   13 +-
 group/dlm_controld/action.c                        |  443 ++-----
 group/dlm_controld/config.c                        |  288 ++++
 group/dlm_controld/config.h                        |   56 +
 group/dlm_controld/cpg.c                           | 1383 ++++++++++++++++++++
 .../gfs2_disk_hash.h => group/dlm_controld/crc.c   |   13 +-
 group/dlm_controld/deadlock.c                      |  326 +----
 group/dlm_controld/dlm_daemon.h                    |  189 +++-
 group/dlm_controld/group.c                         |   45 +-
 group/dlm_controld/main.c                          |  597 +++++----
 group/dlm_controld/member_cman.c                   |    9 +-
 group/dlm_controld/netlink.c                       |  237 ++++
 group/{gfs_controld => dlm_controld}/plock.c       |  807 ++++++------
 group/include/list.h                               |   11 +
 14 files changed, 3078 insertions(+), 1339 deletions(-)
 create mode 100644 group/dlm_controld/config.c
 create mode 100644 group/dlm_controld/config.h
 create mode 100644 group/dlm_controld/cpg.c
 copy gfs2/include/gfs2_disk_hash.h => group/dlm_controld/crc.c (96%)
 create mode 100644 group/dlm_controld/netlink.c
 copy group/{gfs_controld => dlm_controld}/plock.c (71%)

diff --git a/group/dlm_controld/Makefile b/group/dlm_controld/Makefile
index 9f26a14..8098bad 100644
--- a/group/dlm_controld/Makefile
+++ b/group/dlm_controld/Makefile
@@ -22,11 +22,16 @@ include $(OBJDIR)/make/clean.mk
 include $(OBJDIR)/make/install.mk
 include $(OBJDIR)/make/uninstall.mk
 
-OBJS=	main.o \
+OBJS=	action.o \
+	config.o \
+	cpg.o \
+	crc.o \
+	deadlock.o \
+	main.o \
 	member_cman.o \
-	group.o \
-	action.o \
-	deadlock.o
+	netlink.o \
+	plock.o \
+	group.o
 
 CFLAGS += -I${ccsincdir} -I${cmanincdir} -I${dlmincdir} -I${openaisincdir}
 CFLAGS += -I${KERNEL_SRC}/include/
diff --git a/group/dlm_controld/action.c b/group/dlm_controld/action.c
index 34e84fe..b7c422b 100644
--- a/group/dlm_controld/action.c
+++ b/group/dlm_controld/action.c
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -10,79 +10,128 @@
 *******************************************************************************
 ******************************************************************************/
 
-#include <sys/types.h>
-#include <asm/types.h>
-#include <sys/uio.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <sys/ioctl.h>
-#include <sys/stat.h>
-#include <sys/utsname.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <net/if.h>
-#include <stdio.h>
-#include <errno.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stddef.h>
-#include <stdint.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <limits.h>
-#include <unistd.h>
-#include <dirent.h>
-
 #include "dlm_daemon.h"
-#include "ccs.h"
+#include "config.h"
 
-static int dir_members[MAX_GROUP_MEMBERS];
+static int dir_members[MAX_NODES];
 static int dir_members_count;
 static int comms_nodes[MAX_NODES];
 static int comms_nodes_count;
+static char mg_name[MAXNAME+1];
 
 #define DLM_SYSFS_DIR "/sys/kernel/dlm"
 #define CLUSTER_DIR   "/sys/kernel/config/dlm/cluster"
 #define SPACES_DIR    "/sys/kernel/config/dlm/cluster/spaces"
 #define COMMS_DIR     "/sys/kernel/config/dlm/cluster/comms"
 
+/* look for an id that matches in e.g. /sys/fs/gfs/bull\:x/lock_module/id
+   and then extract the "x" as the name */
 
-int do_read(int fd, void *buf, size_t count)
+static int get_mountgroup_name(uint32_t mg_id)
 {
-	int rv, off = 0;
+	char path[PATH_MAX];
+	char *fsname, *fsdir;
+	DIR *d;
+	FILE *file;
+	struct dirent *de;
+	uint32_t id;
+	int retry_gfs2 = 1;
+	int rv, error;
 
-	while (off < count) {
-		rv = read(fd, buf + off, count - off);
-		if (rv == 0)
-			return -1;
-		if (rv == -1 && errno == EINTR)
+	fsdir = "/sys/fs/gfs";
+ retry:
+	rv = -1;
+
+	d = opendir(fsdir);
+	if (!d) {
+		log_debug("%s: opendir failed: %d", path, errno);
+		goto out;
+	}
+
+	while ((de = readdir(d))) {
+		if (de->d_name[0] == '.')
 			continue;
-		if (rv == -1)
-			return -1;
-		off += rv;
+
+		id = 0;
+		memset(path, 0, PATH_MAX);
+		snprintf(path, PATH_MAX, "%s/%s/lock_module/id",
+			 fsdir, de->d_name);
+
+		file = fopen(path, "r");
+		if (!file) {
+			log_error("can't open %s %d", path, errno);
+			continue;
+		}
+
+		error = fscanf(file, "%u", &id);
+		fclose(file);
+
+		if (error != 1) {
+			log_error("bad read %s %d", path, errno);
+			continue;
+		}
+		if (id != mg_id) {
+			log_debug("get_mountgroup_name skip %x %s",
+				  id, de->d_name);
+			continue;
+		}
+
+		/* take the fsname out of clustername:fsname */
+		fsname = strstr(de->d_name, ":");
+		if (!fsname) {
+			log_debug("get_mountgroup_name skip2 %x %s",
+				  id, de->d_name);
+			continue;
+		}
+		fsname++;
+
+		log_debug("get_mountgroup_name found %x %s %s",
+			  id, de->d_name, fsname);
+		strncpy(mg_name, fsname, 256);
+		rv = 0;
+		break;
 	}
-	return 0;
+
+	closedir(d);
+
+ out:
+	if (rv && retry_gfs2) {
+		retry_gfs2 = 0;
+		fsdir = "/sys/fs/gfs2";
+		goto retry;
+	}
+
+	return rv;
 }
 
-int do_write(int fd, void *buf, size_t count)
+/* find the mountgroup with "mg_id" in sysfs, get it's name, then look for
+   the ls with with the same name in lockspaces list, return its id */
+
+void set_associated_id(uint32_t mg_id)
 {
-	int rv, off = 0;
+	struct lockspace *ls;
+	int rv;
 
- retry:
-	rv = write(fd, buf + off, count);
-	if (rv == -1 && errno == EINTR)
-		goto retry;
-	if (rv < 0) {
-		log_error("write errno %d", errno);
-		return rv;
+	log_debug("set_associated_id mg_id %x %d", mg_id, mg_id);
+
+	memset(&mg_name, 0, sizeof(mg_name));
+
+	rv = get_mountgroup_name(mg_id);
+	if (rv) {
+		log_error("no mountgroup found with id %x", mg_id);
+		return;
 	}
 
-	if (rv != count) {
-		count -= rv;
-		off += rv;
-		goto retry;
+	ls = find_ls(mg_name);
+	if (!ls) {
+		log_error("no lockspace found with name %s for mg_id %x",
+			   mg_name, mg_id);
+		return;
 	}
-	return 0;
+
+	log_debug("set_associated_id mg %x is ls %x", mg_id, ls->global_id);
+
+	ls->associated_mg_id = mg_id;
 }
 
 static int do_sysfs(char *name, char *file, char *val)
@@ -105,7 +154,7 @@ static int do_sysfs(char *name, char *file, char *val)
 	return rv;
 }
 
-int set_control(char *name, int val)
+int set_sysfs_control(char *name, int val)
 {
 	char buf[32];
 
@@ -115,7 +164,7 @@ int set_control(char *name, int val)
 	return do_sysfs(name, "control", buf);
 }
 
-int set_event_done(char *name, int val)
+int set_sysfs_event_done(char *name, int val)
 {
 	char buf[32];
 
@@ -125,7 +174,7 @@ int set_event_done(char *name, int val)
 	return do_sysfs(name, "event_done", buf);
 }
 
-int set_id(char *name, uint32_t id)
+int set_sysfs_id(char *name, uint32_t id)
 {
 	char buf[32];
 
@@ -207,122 +256,17 @@ static int path_exists(const char *path)
 	return 1;
 }
 
-static int open_ccs(void)
-{
-	int i, cd;
-
-	while ((cd = ccs_connect()) < 0) {
-		sleep(1);
-		if (++i > 9 && !(i % 10))
-			log_error("connect to ccs error %d, "
-				  "check ccsd or cluster status", cd);
-	}
-	return cd;
-}
-
-/* when not set in cluster.conf, a node's default weight is 1 */
-
-#define MASTER_PATH "/cluster/dlm/lockspace[ name=\"%s\"]/master"
-#define WEIGHT_PATH "/cluster/clusternodes/clusternode[ name=\"%s\"]/@weight"
-
-#define MASTER_NAME   MASTER_PATH "/@name"
-#define MASTER_WEIGHT MASTER_PATH "[ name=\"%s\"]/@weight"
-
-/* look for node's weight in the dlm/lockspace section */
-
-static int get_weight_lockspace(int cd, char *node, char *lockspace)
-{
-	char path[PATH_MAX], *str;
-	int error, weight;
-	int master_count = 0, node_is_master = 0;
-
-	memset(path, 0, PATH_MAX);
-	sprintf(path, MASTER_NAME, lockspace);
-
-	while (1) {
-		error = ccs_get_list(cd, path, &str);
-		if (error || !str)
-			break;
-		master_count++;
-		if (strcmp(str, node) == 0)
-			node_is_master = 1;
-		free(str);
-	}
-
-	/* if there are no masters, next check for a clusternode weight */
-
-	if (!master_count)
-		return -1;
-
-	/* if there's a master and this node isn't it, it gets weight 0 */
-
-	if (!node_is_master)
-		return 0;
-
-	/* master gets its specified weight or 1 if none is given */
-
-	memset(path, 0, PATH_MAX);
-	sprintf(path, MASTER_WEIGHT, lockspace, node);
-
-	error = ccs_get(cd, path, &str);
-	if (error || !str)
-		return 1;
-
-	weight = atoi(str);
-	free(str);
-	return weight;
-}
-
-/* look for node's weight on its clusternode line */
-
-static int get_weight_clusternode(int cd, char *node, char *lockspace)
-{
-	char path[PATH_MAX], *str;
-	int error, weight;
-
-	memset(path, 0, PATH_MAX);
-	sprintf(path, WEIGHT_PATH, node);
-
-	error = ccs_get(cd, path, &str);
-	if (error || !str)
-		return -1;
-
-	weight = atoi(str);
-	free(str);
-	return weight;
-}
-
-static int get_weight(int cd, int nodeid, char *lockspace)
-{
-	char *node;
-	int w;
-
-	node = nodeid2name(nodeid);
-	if (!node) {
-		log_error("no name for nodeid %d", nodeid);
-		w = 1;
-		goto out;
-	}
-
-	w = get_weight_lockspace(cd, node, lockspace);
-	if (w >= 0)
-		goto out;
-
-	w = get_weight_clusternode(cd, node, lockspace);
-	if (w >= 0)
-		goto out;
-
-	/* default weight is 1 */
-	w = 1;
- out:
-	return w;
-}
+/* The "renew" nodes are those that have left and rejoined since the last
+   call to set_members().  We rmdir/mkdir for these nodes so dlm-kernel
+   can notice they've left and rejoined. */
 
-int set_members(char *name, int new_count, int *new_members)
+int set_configfs_members(char *name, int new_count, int *new_members,
+			 int renew_count, int *renew_members)
 {
 	char path[PATH_MAX];
 	char buf[32];
 	int i, w, fd, rv, id, cd = 0, old_count, *old_members;
+	int do_renew;
 
 	/*
 	 * create lockspace dir if it doesn't exist yet
@@ -383,7 +327,12 @@ int set_members(char *name, int new_count, int *new_members)
 
 	for (i = 0; i < new_count; i++) {
 		id = new_members[i];
-		if (id_exists(id, old_count, old_members))
+
+		do_renew = 0;
+
+		if (id_exists(id, renew_count, renew_members))
+			do_renew = 1;
+		else if (id_exists(id, old_count, old_members))
 			continue;
 
 		if (!is_cman_member(id))
@@ -396,6 +345,16 @@ int set_members(char *name, int new_count, int *new_members)
 		snprintf(path, PATH_MAX, "%s/%s/nodes/%d",
 			 SPACES_DIR, name, id);
 
+		if (do_renew) {
+			log_debug("set_members renew rmdir \"%s\"", path);
+			rv = rmdir(path);
+			if (rv) {
+				log_error("%s: renew rmdir failed: %d",
+					  path, errno);
+				goto out;
+			}
+		}
+
 		log_debug("set_members mkdir \"%s\"", path);
 
 		rv = create_path(path);
@@ -461,7 +420,7 @@ int set_members(char *name, int new_count, int *new_members)
 	rv = 0;
  out:
 	if (cd)
-		ccs_disconnect(cd);
+		close_ccs(cd);
 	return rv;
 }
 
@@ -476,7 +435,7 @@ char *str_ip(char *addr)
 }
 #endif
 
-char *str_ip(char *addr)
+static char *str_ip(char *addr)
 {
 	static char str_ip_buf[INET6_ADDRSTRLEN];
 	struct sockaddr_storage *ss = (struct sockaddr_storage *)addr;
@@ -528,7 +487,7 @@ static int update_comms_nodes(void)
 
 /* clear out everything under config/dlm/cluster/comms/ */
 
-void clear_configfs_comms(void)
+static void clear_configfs_comms(void)
 {
 	char path[PATH_MAX];
 	int i, rv;
@@ -573,7 +532,7 @@ static void clear_configfs_space_nodes(char *name)
 
 /* clear out everything under config/dlm/cluster/spaces/ */
 
-void clear_configfs_spaces(void)
+static void clear_configfs_spaces(void)
 {
 	char path[PATH_MAX];
 	DIR *d;
@@ -749,89 +708,7 @@ void del_configfs_node(int nodeid)
 		log_error("%s: rmdir failed: %d", path, errno);
 }
 
-#define PROTOCOL_PATH "/cluster/dlm/@protocol"
-#define PROTO_TCP  1
-#define PROTO_SCTP 2
-
-static int get_ccs_protocol(int cd)
-{
-	char path[PATH_MAX], *str;
-	int error, rv;
-
-	memset(path, 0, PATH_MAX);
-	sprintf(path, PROTOCOL_PATH);
-
-	error = ccs_get(cd, path, &str);
-	if (error || !str)
-		return -1;
-
-	if (!strncasecmp(str, "tcp", 3))
-		rv = PROTO_TCP;
-	else if (!strncasecmp(str, "sctp", 4))
-		rv = PROTO_SCTP;
-	else {
-		log_error("read invalid dlm protocol from ccs");
-		rv = 0;
-	}
-
-	free(str);
-	log_debug("got ccs protocol %d", rv);
-	return rv;
-}
-
-#define TIMEWARN_PATH "/cluster/dlm/@timewarn"
-
-static int get_ccs_timewarn(int cd)
-{
-	char path[PATH_MAX], *str;
-	int error, rv;
-
-	memset(path, 0, PATH_MAX);
-	sprintf(path, TIMEWARN_PATH);
-
-	error = ccs_get(cd, path, &str);
-	if (error || !str)
-		return -1;
-
-	rv = atoi(str);
-
-	if (rv <= 0) {
-		log_error("read invalid dlm timewarn from ccs");
-		rv = -1;
-	}
-
-	free(str);
-	log_debug("got ccs timewarn %d", rv);
-	return rv;
-}
-
-#define DEBUG_PATH "/cluster/dlm/@log_debug"
-
-static int get_ccs_debug(int cd)
-{
-	char path[PATH_MAX], *str;
-	int error, rv;
-
-	memset(path, 0, PATH_MAX);
-	sprintf(path, DEBUG_PATH);
-
-	error = ccs_get(cd, path, &str);
-	if (error || !str)
-		return -1;
-
-	rv = atoi(str);
-
-	if (rv < 0) {
-		log_error("read invalid dlm log_debug from ccs");
-		rv = -1;
-	}
-
-	free(str);
-	log_debug("got ccs log_debug %d", rv);
-	return rv;
-}
-
-static int set_configfs_protocol(int proto)
+int set_configfs_protocol(int proto)
 {
 	char path[PATH_MAX];
 	char buf[32];
@@ -863,7 +740,7 @@ static int set_configfs_protocol(int proto)
 	return 0;
 }
 
-static int set_configfs_timewarn(int cs)
+int set_configfs_timewarn(int cs)
 {
 	char path[PATH_MAX];
 	char buf[32];
@@ -895,7 +772,7 @@ static int set_configfs_timewarn(int cs)
 	return 0;
 }
 
-static int set_configfs_debug(int val)
+int set_configfs_debug(int val)
 {
 	char path[PATH_MAX];
 	char buf[32];
@@ -927,59 +804,3 @@ static int set_configfs_debug(int val)
 	return 0;
 }
 
-static void set_protocol(int cd)
-{
-	int rv, proto;
-
-	rv = get_ccs_protocol(cd);
-	if (!rv || rv < 0)
-		return;
-
-	/* for dlm kernel, TCP=0 and SCTP=1 */
-	if (rv == PROTO_TCP)
-		proto = 0;
-	else if (rv == PROTO_SCTP)
-		proto = 1;
-	else
-		return;
-
-	set_configfs_protocol(proto);
-}
-
-static void set_timewarn(int cd)
-{
-	int rv;
-
-	rv = get_ccs_timewarn(cd);
-	if (rv < 0)
-		return;
-
-	set_configfs_timewarn(rv);
-}
-
-static void set_debug(int cd)
-{
-	int rv;
-
-	rv = get_ccs_debug(cd);
-	if (rv < 0)
-		return;
-
-	set_configfs_debug(rv);
-}
-
-void set_ccs_options(void)
-{
-	int cd;
-
-	cd = open_ccs();
-
-	log_debug("set_ccs_options %d", cd);
-
-	set_protocol(cd);
-	set_timewarn(cd);
-	set_debug(cd);
-
-	ccs_disconnect(cd);
-}
-
diff --git a/group/dlm_controld/config.c b/group/dlm_controld/config.c
new file mode 100644
index 0000000..6a4fa73
--- /dev/null
+++ b/group/dlm_controld/config.c
@@ -0,0 +1,288 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include <sys/types.h>
+#include <asm/types.h>
+#include <sys/uio.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/stat.h>
+#include <sys/utsname.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <limits.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#include "dlm_daemon.h"
+#include "config.h"
+#include "ccs.h"
+
+#define PROTO_TCP  1
+#define PROTO_SCTP 2
+
+/* was a config value set on command line?, 0 or 1.
+   optk is a kernel option, optd is a daemon option */
+
+int optk_debug;
+int optk_timewarn;
+int optk_protocol;
+int optd_groupd_compat;
+int optd_enable_deadlk;
+int optd_enable_plock;
+int optd_plock_debug;
+int optd_plock_rate_limit;
+int optd_plock_ownership;
+int optd_drop_resources_time;
+int optd_drop_resources_count;
+int optd_drop_resources_age;
+
+/* actual config value from command line, cluster.conf, or default.
+   cfgk is a kernel config value, cfgd is a daemon config value */
+
+int cfgk_debug			= -1;
+int cfgk_timewarn		= -1;
+int cfgk_protocol		= -1;
+int cfgd_groupd_compat		= DEFAULT_GROUPD_COMPAT;
+int cfgd_enable_deadlk		= DEFAULT_ENABLE_DEADLK;
+int cfgd_enable_plock		= DEFAULT_ENABLE_PLOCK;
+int cfgd_plock_debug		= DEFAULT_PLOCK_DEBUG;
+int cfgd_plock_rate_limit	= DEFAULT_PLOCK_RATE_LIMIT;
+int cfgd_plock_ownership	= DEFAULT_PLOCK_OWNERSHIP;
+int cfgd_drop_resources_time	= DEFAULT_DROP_RESOURCES_TIME;
+int cfgd_drop_resources_count	= DEFAULT_DROP_RESOURCES_COUNT;
+int cfgd_drop_resources_age	= DEFAULT_DROP_RESOURCES_AGE;
+
+
+/* when not set in cluster.conf, a node's default weight is 1 */
+
+#define MASTER_PATH "/cluster/dlm/lockspace[ name=\"%s\"]/master"
+#define WEIGHT_PATH "/cluster/clusternodes/clusternode[ name=\"%s\"]/@weight"
+#define MASTER_NAME   MASTER_PATH "/@name"
+#define MASTER_WEIGHT MASTER_PATH "[ name=\"%s\"]/@weight"
+
+/* look for node's weight in the dlm/lockspace section */
+
+static int get_weight_lockspace(int cd, char *node, char *lockspace)
+{
+	char path[PATH_MAX], *str;
+	int error, weight;
+	int master_count = 0, node_is_master = 0;
+
+	memset(path, 0, PATH_MAX);
+	sprintf(path, MASTER_NAME, lockspace);
+
+	while (1) {
+		error = ccs_get_list(cd, path, &str);
+		if (error || !str)
+			break;
+		master_count++;
+		if (strcmp(str, node) == 0)
+			node_is_master = 1;
+		free(str);
+	}
+
+	/* if there are no masters, next check for a clusternode weight */
+
+	if (!master_count)
+		return -1;
+
+	/* if there's a master and this node isn't it, it gets weight 0 */
+
+	if (!node_is_master)
+		return 0;
+
+	/* master gets its specified weight or 1 if none is given */
+
+	memset(path, 0, PATH_MAX);
+	sprintf(path, MASTER_WEIGHT, lockspace, node);
+
+	error = ccs_get(cd, path, &str);
+	if (error || !str)
+		return 1;
+
+	weight = atoi(str);
+	free(str);
+	return weight;
+}
+
+/* look for node's weight on its clusternode line */
+
+static int get_weight_clusternode(int cd, char *node, char *lockspace)
+{
+	char path[PATH_MAX], *str;
+	int error, weight;
+
+	memset(path, 0, PATH_MAX);
+	sprintf(path, WEIGHT_PATH, node);
+
+	error = ccs_get(cd, path, &str);
+	if (error || !str)
+		return -1;
+
+	weight = atoi(str);
+	free(str);
+	return weight;
+}
+
+int get_weight(int cd, int nodeid, char *lockspace)
+{
+	char *node;
+	int w;
+
+	node = nodeid2name(nodeid);
+	if (!node) {
+		log_error("no name for nodeid %d", nodeid);
+		w = 1;
+		goto out;
+	}
+
+	w = get_weight_lockspace(cd, node, lockspace);
+	if (w >= 0)
+		goto out;
+
+	w = get_weight_clusternode(cd, node, lockspace);
+	if (w >= 0)
+		goto out;
+
+	/* default weight is 1 */
+	w = 1;
+ out:
+	return w;
+}
+
+int open_ccs(void)
+{
+	int i = 0, cd;
+
+	while ((cd = ccs_connect()) < 0) {
+		sleep(1);
+		if (++i > 9 && !(i % 10))
+			log_error("connect to ccs error %d, "
+				  "check ccsd or cluster status", cd);
+	}
+	return cd;
+}
+
+void close_ccs(int cd)
+{
+	ccs_disconnect(cd);
+}
+
+static void read_ccs_int(int cd, char *path, int *config_val)
+{
+	char *str;
+	int val;
+	int error;
+
+	error = ccs_get(cd, path, &str);
+	if (error || !str)
+		return;
+
+	val = atoi(str);
+
+	if (val < 0) {
+		log_error("ignore invalid value %d for %s", val, path);
+		return;
+	}
+
+	*config_val = val;
+	log_debug("%s is %u", path, val);
+	free(str);
+}
+
+static void read_ccs_protocol(int cd, char *path, int *config_val)
+{
+	char *str;
+	int val;
+	int error;
+
+	error = ccs_get(cd, path, &str);
+	if (error || !str)
+		return;
+
+	if (!strncasecmp(str, "tcp", 3))
+		val = PROTO_TCP;
+	else if (!strncasecmp(str, "sctp", 4))
+		val = PROTO_SCTP;
+	else {
+		log_error("ignore invalid value %s for %s", str, path);
+		return;
+	}
+
+	*config_val = val;
+	log_debug("%s is %u (%s)", path, val, str);
+	free(str);
+}
+
+#define DEBUG_PATH "/cluster/dlm/@log_debug"
+#define TIMEWARN_PATH "/cluster/dlm/@timewarn"
+#define PROTOCOL_PATH "/cluster/dlm/@protocol"
+#define GROUPD_COMPAT_PATH "/cluster/dlm/@groupd_compat"
+#define ENABLE_DEADLK_PATH "/cluster/dlm/@enable_deadlk"
+#define ENABLE_PLOCK_PATH "/cluster/dlm/@enable_plock"
+#define PLOCK_DEBUG_PATH "/cluster/dlm/@plock_debug"
+#define PLOCK_RATE_LIMIT_PATH "/cluster/dlm/@plock_rate_limit"
+#define PLOCK_OWNERSHIP_PATH "/cluster/dlm/@plock_ownership"
+#define DROP_RESOURCES_TIME_PATH "/cluster/dlm/@drop_resources_time"
+#define DROP_RESOURCES_COUNT_PATH "/cluster/dlm/@drop_resources_count"
+#define DROP_RESOURCES_AGE_PATH "/cluster/dlm/@drop_resources_age"
+
+/* These config values are set from cluster.conf only if they haven't already
+   been set on the command line. */
+
+void read_ccs(void)
+{
+	int cd;
+
+	cd = open_ccs();
+	if (cd < 0)
+		return;
+
+	if (!optk_debug)
+		read_ccs_int(cd, DEBUG_PATH, &cfgk_debug);
+	if (!optk_timewarn)
+		read_ccs_int(cd, TIMEWARN_PATH, &cfgk_timewarn);
+	if (!optk_protocol)
+		read_ccs_protocol(cd, PROTOCOL_PATH, &cfgk_protocol);
+	if (!optd_groupd_compat)
+		read_ccs_int(cd, GROUPD_COMPAT_PATH, &cfgd_groupd_compat);
+	if (!optd_enable_deadlk)
+		read_ccs_int(cd, ENABLE_DEADLK_PATH, &cfgd_enable_deadlk);
+	if (!optd_enable_plock)
+		read_ccs_int(cd, ENABLE_PLOCK_PATH, &cfgd_enable_plock);
+	if (!optd_plock_debug)
+		read_ccs_int(cd, PLOCK_DEBUG_PATH, &cfgd_plock_debug);
+	if (!optd_plock_rate_limit)
+		read_ccs_int(cd, PLOCK_RATE_LIMIT_PATH, &cfgd_plock_rate_limit);
+	if (!optd_plock_ownership)
+		read_ccs_int(cd, PLOCK_OWNERSHIP_PATH, &cfgd_plock_ownership);
+	if (!optd_drop_resources_time)
+		read_ccs_int(cd, DROP_RESOURCES_TIME_PATH, &cfgd_drop_resources_time);
+	if (!optd_drop_resources_count)
+		read_ccs_int(cd, DROP_RESOURCES_COUNT_PATH, &cfgd_drop_resources_count);
+	if (!optd_drop_resources_age)
+		read_ccs_int(cd, DROP_RESOURCES_AGE_PATH, &cfgd_drop_resources_age);
+
+	ccs_disconnect(cd);
+}
+
diff --git a/group/dlm_controld/config.h b/group/dlm_controld/config.h
new file mode 100644
index 0000000..0ae121d
--- /dev/null
+++ b/group/dlm_controld/config.h
@@ -0,0 +1,56 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2008 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+/* the kernel has default values for debug, timewarn and protocol;
+   we only change them if new values are given on command line or in ccs */
+
+#define DEFAULT_GROUPD_COMPAT 1
+#define DEFAULT_ENABLE_DEADLK 0
+#define DEFAULT_ENABLE_PLOCK 1
+#define DEFAULT_PLOCK_DEBUG 0
+#define DEFAULT_PLOCK_RATE_LIMIT 100
+#define DEFAULT_PLOCK_OWNERSHIP 1
+#define DEFAULT_DROP_RESOURCES_TIME 10000 /* 10 sec */
+#define DEFAULT_DROP_RESOURCES_COUNT 10
+#define DEFAULT_DROP_RESOURCES_AGE 10000 /* 10 sec */
+
+extern int optk_debug;
+extern int optk_timewarn;
+extern int optk_protocol;
+extern int optd_groupd_compat;
+extern int optd_enable_deadlk;
+extern int optd_enable_plock;
+extern int optd_plock_debug;
+extern int optd_plock_rate_limit;
+extern int optd_plock_ownership;
+extern int optd_drop_resources_time;
+extern int optd_drop_resources_count;
+extern int optd_drop_resources_age;
+
+extern int cfgk_debug;
+extern int cfgk_timewarn;
+extern int cfgk_protocol;
+extern int cfgd_groupd_compat;
+extern int cfgd_enable_deadlk;
+extern int cfgd_enable_plock;
+extern int cfgd_plock_debug;
+extern int cfgd_plock_rate_limit;
+extern int cfgd_plock_ownership;
+extern int cfgd_drop_resources_time;
+extern int cfgd_drop_resources_count;
+extern int cfgd_drop_resources_age;
+
+void read_ccs(void);
+int open_ccs(void);
+void close_ccs(int cd);
+int get_weight(int cd, int nodeid, char *lockspace);
+
diff --git a/group/dlm_controld/cpg.c b/group/dlm_controld/cpg.c
new file mode 100644
index 0000000..21c1a43
--- /dev/null
+++ b/group/dlm_controld/cpg.c
@@ -0,0 +1,1383 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2007-2008 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_daemon.h"
+#include "config.h"
+
+uint32_t cpgname_to_crc(const char *data, int len);
+
+int message_flow_control_on;
+static cpg_handle_t daemon_handle;
+static unsigned int protocol_active[3] = {1, 0, 0};
+
+struct member {
+	struct list_head list;
+	int nodeid;
+	int start;   /* 1 if we received a start message for this change */
+	int added;   /* 1 if added by this change */
+	int failed;  /* 1 if failed in this change */
+	int disallowed;
+	uint32_t start_flags;
+};
+
+struct node {
+	struct list_head list;
+	int nodeid;
+	int needs_fencing;
+	struct timeval add_time;
+};
+
+/* One of these change structs is created for every confchg a cpg gets. */
+
+#define CGST_WAIT_CONDITIONS 1
+#define CGST_WAIT_MESSAGES   2
+
+struct change {
+	struct list_head list;
+	struct list_head members;
+	struct list_head removed; /* nodes removed by this change */
+	int member_count;
+	int joined_count;
+	int remove_count;
+	int failed_count;
+	int state;
+	int we_joined;
+	uint32_t seq; /* just used as a reference when debugging */
+};
+
+char *msg_name(int type)
+{
+	switch (type) {
+	case DLM_MSG_START:
+		return "start";
+	case DLM_MSG_PLOCK:
+		return "plock";
+	case DLM_MSG_PLOCK_OWN:
+		return "plock_own";
+	case DLM_MSG_PLOCK_DROP:
+		return "plock_drop";
+	case DLM_MSG_PLOCK_SYNC_LOCK:
+		return "plock_sync_lock";
+	case DLM_MSG_PLOCK_SYNC_WAITER:
+		return "plock_sync_waiter";
+	case DLM_MSG_PLOCKS_STORED:
+		return "plocks_stored";
+	case DLM_MSG_DEADLK_CYCLE_START:
+		return "deadlk_cycle_start";
+	case DLM_MSG_DEADLK_CYCLE_END:
+		return "deadlk_cycle_end";
+	case DLM_MSG_DEADLK_CHECKPOINT_READY:
+		return "deadlk_checkpoint_ready";
+	case DLM_MSG_DEADLK_CANCEL_LOCK:
+		return "deadlk_cancel_lock";
+	default:
+		return "unknown";
+	}
+}
+
+static char *str_nums(int *nums, int n_ints)
+{
+	static char buf[128];
+	int i, len, ret, pos = 0;
+
+	len = sizeof(buf);
+	memset(buf, 0, len);
+
+	for (i = 0; i < n_ints; i++) {
+		ret = snprintf(buf + pos, len - pos, "%d ",
+			       le32_to_cpu(nums[i]));
+		if (ret >= len - pos)
+			break;
+		pos += ret;
+	}
+
+	return buf;
+}
+
+static int _send_message(cpg_handle_t h, void *buf, int len, int type)
+{
+	struct iovec iov;
+	cpg_error_t error;
+	int retries = 0;
+
+	iov.iov_base = buf;
+	iov.iov_len = len;
+
+ retry:
+	error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		retries++;
+		usleep(1000);
+		if (!(retries % 100))
+			log_error("cpg_mcast_joined retry %d %s",
+				   retries, msg_name(type));
+		goto retry;
+	}
+	if (error != CPG_OK) {
+		log_error("cpg_mcast_joined error %d handle %llx %s",
+			  error, (unsigned long long)h, msg_name(type));
+		return -1;
+	}
+
+	if (retries)
+		log_debug("cpg_mcast_joined retried %d %s",
+			  retries, msg_name(type));
+
+	return 0;
+}
+
+/* header fields caller needs to set: type, to_nodeid, flags, msgdata */
+
+void dlm_send_message(struct lockspace *ls, char *buf, int len)
+{
+	struct dlm_header *hd = (struct dlm_header *) buf;
+	int type = hd->type;
+
+	hd->version[0]  = cpu_to_le16(protocol_active[0]);
+	hd->version[1]  = cpu_to_le16(protocol_active[1]);
+	hd->version[2]  = cpu_to_le16(protocol_active[2]);
+	hd->type	= cpu_to_le16(hd->type);
+	hd->nodeid      = cpu_to_le32(our_nodeid);
+	hd->to_nodeid   = cpu_to_le32(hd->to_nodeid);
+	hd->global_id   = cpu_to_le32(ls->global_id);
+	hd->flags       = cpu_to_le32(hd->flags);
+	hd->msgdata     = cpu_to_le32(hd->msgdata);
+
+	_send_message(ls->cpg_handle, buf, len, type);
+}
+
+static struct member *find_memb(struct change *cg, int nodeid)
+{
+	struct member *memb;
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (memb->nodeid == nodeid)
+			return memb;
+	}
+	return NULL;
+}
+
+static struct lockspace *find_ls_handle(cpg_handle_t h)
+{
+	struct lockspace *ls;
+
+	list_for_each_entry(ls, &lockspaces, list) {
+		if (ls->cpg_handle == h)
+			return ls;
+	}
+	return NULL;
+}
+
+static struct lockspace *find_ls_ci(int ci)
+{
+	struct lockspace *ls;
+
+	list_for_each_entry(ls, &lockspaces, list) {
+		if (ls->cpg_client == ci)
+			return ls;
+	}
+	return NULL;
+}
+
+static void free_cg(struct change *cg)
+{
+	struct member *memb, *safe;
+
+	list_for_each_entry_safe(memb, safe, &cg->members, list) {
+		list_del(&memb->list);
+		free(memb);
+	}
+	list_for_each_entry_safe(memb, safe, &cg->removed, list) {
+		list_del(&memb->list);
+		free(memb);
+	}
+	free(cg);
+}
+
+static void free_ls(struct lockspace *ls)
+{
+	struct change *cg, *cg_safe;
+	struct node *node, *node_safe;
+
+	list_for_each_entry_safe(cg, cg_safe, &ls->changes, list) {
+		list_del(&cg->list);
+		free_cg(cg);
+	}
+
+	if (ls->started_change)
+		free_cg(ls->started_change);
+
+	list_for_each_entry_safe(node, node_safe, &ls->node_history, list) {
+		list_del(&node->list);
+		free(node);
+	}
+
+	free(ls);
+}
+
+
+/* Problem scenario:
+   nodes A,B,C are in fence domain
+   node C has gfs foo mounted
+   node C fails
+   nodes A,B begin fencing C (slow, not completed)
+   node B mounts gfs foo
+
+   We may end up having gfs foo mounted and being used on B before
+   C has been fenced.  C could wake up corrupt fs.
+
+   So, we need to prevent any new gfs mounts while there are any
+   outstanding, incomplete fencing operations.
+
+   We also need to check that the specific failed nodes we know about have
+   been fenced (since fenced may not even have been notified that the node
+   has failed yet).
+
+   So, check that:
+   1. has fenced fenced the node after it joined this lockspace?
+   2. fenced has no outstanding fencing ops
+
+   For 1:
+   - record the time of the first good start message we see from node X
+   - node X fails
+   - wait for X to be removed from all dlm cpg's  (probably not necessary)
+   - check that the fencing time is later than the recorded time above
+
+   Tracking fencing state when there are spurious partitions/merges...
+
+   from a spurious leave/join of node X, a lockspace will see:
+   - node X is a lockspace member
+   - node X fails, may be waiting for all cpgs to see failure or for fencing to
+     complete
+   - node X joins the lockspace - we want to process the change as usual, but
+     don't want to disrupt the code waiting for the fencing, and we want to
+     continue running properly once the remerged node is properly reset
+
+   ls->node_history
+   when we see a node not in this list, add entry for it with zero add_time
+   record the time we get a good start message from the node, add_time
+   clear add_time if the node leaves
+   if node fails with non-zero add_time, set needs_fencing
+   when a node is fenced, clear add_time and clear needs_fencing
+   if a node remerges after this, no good start message, no new add_time set
+   if a node fails with zero add_time, it doesn't need fencing
+   if a node remerges before it's been fenced, no good start message, no new
+   add_time set 
+*/
+
+static struct node *get_node_history(struct lockspace *ls, int nodeid)
+{
+	struct node *node;
+
+	list_for_each_entry(node, &ls->node_history, list) {
+		if (node->nodeid == nodeid)
+			return node;
+	}
+	return NULL;
+}
+
+static void node_history_init(struct lockspace *ls, int nodeid)
+{
+	struct node *node;
+
+	node = get_node_history(ls, nodeid);
+	if (node)
+		return;
+
+	node = malloc(sizeof(struct node));
+	if (!node)
+		return;
+	memset(node, 0, sizeof(struct node));
+
+	node->nodeid = nodeid;
+	timerclear(&node->add_time);
+	list_add_tail(&node->list, &ls->node_history);
+}
+
+static void node_history_start(struct lockspace *ls, int nodeid)
+{
+	struct node *node;
+	
+	node = get_node_history(ls, nodeid);
+	if (!node) {
+		log_error("node_history_start no nodeid %d", nodeid);
+		return;
+	}
+
+	gettimeofday(&node->add_time, NULL);
+}
+
+static void node_history_left(struct lockspace *ls, int nodeid)
+{
+	struct node *node;
+
+	node = get_node_history(ls, nodeid);
+	if (!node) {
+		log_error("node_history_left no nodeid %d", nodeid);
+		return;
+	}
+
+	timerclear(&node->add_time);
+}
+
+static void node_history_fail(struct lockspace *ls, int nodeid)
+{
+	struct node *node;
+
+	node = get_node_history(ls, nodeid);
+	if (!node) {
+		log_error("node_history_fail no nodeid %d", nodeid);
+		return;
+	}
+
+	if (!timerisset(&node->add_time))
+		node->needs_fencing = 1;
+}
+
+static int failed_nodes_fenced(struct lockspace *ls)
+{
+#if 0
+	struct node *node;
+	struct timeval last_fenced;
+	int wait_count = 0;
+
+	list_for_each_entry(node, &ls->node_history, list) {
+		if (!node->needs_fencing)
+			continue;
+
+		/* check with fenced to see if the node has been
+		   fenced since node->add_time */
+
+		fencedomain_last_success(node->nodeid, &last_fenced);
+
+		if (last_fenced <= node->add_time) {
+			wait_count++;
+			continue;
+		}
+
+		/* node has been fenced */
+		node->needs_fencing = 0;
+		timerclear(&node->add_time);
+	}
+
+	if (wait_count) {
+		return 0;
+	}
+
+	/* now check if there are any outstanding fencing ops (for nodes
+	   we may not have seen in any lockspace), and return 0 if there
+	   are any */
+
+	fencedomain_pending_count(&pending);
+	if (pending)
+		return 0;
+#endif
+	return 1;
+}
+
+static int cluster_has_quorum(struct lockspace *ls)
+{
+	/* verify cman_last_failure_time() for this node is more recent
+	   than when we last saw the node added; then we know that the
+	   quorum result from cman is accounting for the given failure. */
+	return 1;
+}
+
+static int cluster_filesystem_stopped(struct lockspace *ls)
+{
+	/* communicate with fs daemon through the fscontrol:hostname
+	   cpg to check if the fs has been notified of any node failures
+	   in this change */
+	return 1;
+}
+
+static int member_ids[MAX_NODES];
+static int member_count;
+static int renew_ids[MAX_NODES];
+static int renew_count;
+
+static void format_member_ids(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+	struct member *memb;
+
+	memset(member_ids, 0, sizeof(member_ids));
+	member_count = 0;
+
+	list_for_each_entry(memb, &cg->members, list)
+		member_ids[member_count++] = memb->nodeid;
+}
+
+/* list of nodeids that have left and rejoined since last start_kernel;
+   is any member of startcg in the left list of any other cg's?
+   (if it is, then it presumably must be flagged added in another) */
+
+static void format_renew_ids(struct lockspace *ls)
+{
+	struct change *cg, *startcg;
+	struct member *memb, *leftmemb;
+
+	startcg = list_first_entry(&ls->changes, struct change, list);
+
+	memset(renew_ids, 0, sizeof(renew_ids));
+	renew_count = 0;
+
+	list_for_each_entry(memb, &startcg->members, list) {
+		list_for_each_entry(cg, &ls->changes, list) {
+			if (cg == startcg)
+				continue;
+			list_for_each_entry(leftmemb, &cg->removed, list) {
+				if (memb->nodeid == leftmemb->nodeid) {
+					renew_ids[renew_count++] = memb->nodeid;
+				}
+			}
+		}
+	}
+
+}
+
+static void start_kernel(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+
+	if (!ls->kernel_stopped) {
+		log_error("start_kernel %u not stopped", cg->seq);
+		return;
+	}
+
+	log_group(ls, "start_kernel %u member_count %d",
+		  cg->seq, cg->member_count);
+
+	format_member_ids(ls);
+	format_renew_ids(ls);
+	set_configfs_members(ls->name, member_count, member_ids,
+			     renew_count, renew_ids);
+	set_sysfs_control(ls->name, 1);
+	ls->kernel_stopped = 0;
+
+	if (ls->joining) {
+		set_sysfs_id(ls->name, ls->global_id);
+		set_sysfs_event_done(ls->name, 0);
+		ls->joining = 0;
+	}
+}
+
+static void stop_kernel(struct lockspace *ls, uint32_t seq)
+{
+	if (!ls->kernel_stopped) {
+		log_group(ls, "stop_kernel %u", seq);
+		set_sysfs_control(ls->name, 0);
+		ls->kernel_stopped = 1;
+	}
+}
+
+/* the first condition is that the local lockspace is stopped which we
+   don't need to check for because stop_kernel(), which is synchronous,
+   was done when the change was created */
+
+static int wait_conditions_done(struct lockspace *ls)
+{
+	/* the fencing/quorum/fs conditions need to account for all the changes
+	   that have occured since the last change applied to dlm-kernel, not
+	   just the latest change */
+
+	if (!failed_nodes_fenced(ls)) {
+		poll_fencing = 1;
+		return 0;
+	}
+	poll_fencing = 0;
+
+	/* even though fencing also waits for quorum, checking fencing isn't
+	   sufficient because we don't want to start new lockspaces in an
+	   inquorate cluster */
+
+	if (!cluster_has_quorum(ls)) {
+		poll_quorum = 1;
+		return 0;
+	}
+	poll_quorum = 0;
+
+	if (!cluster_filesystem_stopped(ls)) {
+		poll_fs = 1;
+		return 0;
+	}
+	poll_fs = 0;
+
+	return 1;
+}
+
+static int wait_messages_done(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+	struct member *memb;
+	int need = 0, total = 0;
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (!memb->start)
+			need++;
+		total++;
+	}
+
+	if (need) {
+		log_group(ls, "wait_messages_done need %d of %d", need, total);
+		return 0;
+	}
+
+	log_group(ls, "wait_messages_done got all %d", total);
+	return 1;
+}
+
+static void cleanup_changes(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+	struct change *safe;
+
+	list_del(&cg->list);
+	if (ls->started_change)
+		free_cg(ls->started_change);
+	ls->started_change = cg;
+
+	list_for_each_entry_safe(cg, safe, &ls->changes, list) {
+		list_del(&cg->list);
+		free_cg(cg);
+	}
+}
+
+/* There's a stream of confchg and messages. At one of these
+   messages, the low node needs to store plocks and new nodes
+   need to begin saving plock messages.  A second message is
+   needed to say that the plocks are ready to be read.
+
+   When the last start message is recvd for a change, the low node
+   stores plocks and the new nodes begin saving messages.  When the
+   store is done, low node sends plocks_stored message.  When
+   new nodes recv this, they read the plocks and their saved messages.
+   plocks_stored message should identify a specific change, like start
+   messages do; if it doesn't match ls->started_change, then it's ignored.
+
+   If a confchg adding a new node arrives after plocks are stored but
+   before plocks_stored msg recvd, then the message is ignored.  The low
+   node will send another plocks_stored message for the latest change
+   (although it may be able to reuse the ckpt if no plock state has changed).
+*/
+
+static void set_plock_ckpt_node(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+	struct member *memb;
+	int low = 0;
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (!(memb->start_flags & DLM_MFLG_HAVEPLOCK))
+			continue;
+
+		if (!low || memb->nodeid < low)
+			low = memb->nodeid;
+	}
+
+	log_group(ls, "set_plock_ckpt_node from %d to %d",
+		  ls->plock_ckpt_node, low);
+
+	if (ls->plock_ckpt_node == our_nodeid && low != our_nodeid) {
+		/* Close ckpt so it will go away when the new ckpt_node
+		   unlinks it prior to creating a new one; if we fail
+		   our open ckpts are automatically closed.  At this point
+		   the ckpt has not been unlinked, but won't be held open by
+		   anyone.  We use the max "retentionDuration" to stop the
+		   system from cleaning up ckpts that are open by no one. */
+		close_plock_checkpoint(ls);
+	}
+
+	ls->plock_ckpt_node = low;
+}
+
+/* do the change details in the message match the details of the given change */
+
+static int match_change(struct lockspace *ls, struct change *cg,
+			struct dlm_header *hd, int len)
+{
+	struct member *memb;
+	int member_count, joined_count, remove_count, failed_count;
+	int i, n_ints, *nums, nodeid, members_mismatch;
+	uint32_t seq = hd->msgdata;
+
+	nums = (int *)((char *)hd + sizeof(struct dlm_header));
+
+	member_count = le32_to_cpu(nums[0]);
+	joined_count = le32_to_cpu(nums[1]);
+	remove_count = le32_to_cpu(nums[2]);
+	failed_count = le32_to_cpu(nums[3]);
+
+	n_ints = 4 + member_count;
+	if (len != (sizeof(struct dlm_header) + (n_ints * sizeof(int)))) {
+		log_group(ls, "match_change fail %d:%u bad len %d nums %s",
+			  hd->nodeid, seq, len, str_nums(nums, n_ints));
+		return 0;
+	}
+
+	/* We can ignore messages if we're not in the list of members.  The one
+	   known time this will happen is after we've joined the cpg, we can
+	   get messages for changes prior to the change in which we're added. */
+
+	for (i = 0; i < member_count; i++) {
+		if (our_nodeid == le32_to_cpu(nums[4+i]))
+			break;
+	}
+	if (i == member_count) {
+		log_group(ls, "match_change fail %d:%u we are not in members",
+			  hd->nodeid, seq);
+		return 0;
+	}
+
+	memb = find_memb(cg, hd->nodeid);
+	if (!memb) {
+		log_group(ls, "match_change fail %d:%u sender not member",
+			  hd->nodeid, seq);
+		return 0;
+	}
+
+	/* verify this is the right change by matching the counts
+	   and the nodeids of the current members */
+
+	if (member_count != cg->member_count ||
+	    joined_count != cg->joined_count ||
+	    remove_count != cg->remove_count ||
+	    failed_count != cg->failed_count) {
+		log_group(ls, "match_change fail %d:%u expect counts "
+			  "%d %d %d %d nums %s",
+			  hd->nodeid, seq,
+			  cg->member_count, cg->joined_count,
+			  cg->remove_count, cg->failed_count,
+			  str_nums(nums, n_ints));
+		return 0;
+	}
+
+	members_mismatch = 0;
+	for (i = 0; i < member_count; i++) {
+		nodeid = le32_to_cpu(nums[4+i]);
+		memb = find_memb(cg, nodeid);
+		if (memb)
+			continue;
+		log_group(ls, "match_change fail %d:%u no memb %d",
+			  hd->nodeid, seq, nodeid);
+		members_mismatch = 1;
+	}
+	if (members_mismatch)
+		return 0;
+
+	return 1;
+}
+
+static void send_plocks_stored(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+	struct dlm_header *hd;
+	struct member *memb;
+	int n_ints, len, *p, i;
+	char *buf;
+
+	n_ints = 4 + cg->member_count;
+	len = sizeof(struct dlm_header) + (n_ints * sizeof(uint32_t));
+
+	buf = malloc(len);
+	if (!buf) {
+		return;
+	}
+	memset(buf, 0, len);
+
+	hd = (struct dlm_header *)buf;
+	hd->type = DLM_MSG_PLOCKS_STORED;
+	hd->msgdata = cg->seq;
+
+	p = (int *)(buf + sizeof(struct dlm_header));
+
+	/* sending all this stuff is probably unnecessary, but gives
+	   us more certainty in matching stopped messages to the correct
+	   change that they are for */
+
+	p[0] = cpu_to_le32(cg->member_count);
+	p[1] = cpu_to_le32(cg->joined_count);
+	p[2] = cpu_to_le32(cg->remove_count);
+	p[3] = cpu_to_le32(cg->failed_count);
+
+	i = 4;
+	list_for_each_entry(memb, &cg->members, list)
+		p[i++] = cpu_to_le32(memb->nodeid);
+
+	dlm_send_message(ls, buf, len);
+
+	free(buf);
+}
+
+static void receive_plocks_stored(struct lockspace *ls, struct dlm_header *hd,
+				  int len)
+{
+	log_group(ls, "receive_plocks_stored %d:%u need_plocks %d",
+		  hd->nodeid, hd->msgdata, ls->need_plocks);
+
+	if (!ls->need_plocks)
+		return;
+
+	/* a confchg arrived between the last start and the plocks_stored msg,
+	   so we ignore this plocks_stored msg and wait to read the ckpt until
+	   the next plocks_stored msg following the current start */
+	   
+	if (!list_empty(&ls->changes) || !ls->started_change ||
+	    !match_change(ls, ls->started_change, hd, len)) {
+		log_group(ls, "receive_plocks_stored %d:%u ignore",
+			  hd->nodeid, hd->msgdata);
+		return;
+	}
+
+	retrieve_plocks(ls);
+	process_saved_plocks(ls);
+	ls->need_plocks = 0;
+	ls->save_plocks = 0;
+}
+
+/* Unfortunately, there's no really simple way to match a message with the
+   specific change that it was sent for.  We hope that by passing all the
+   details of the change in the message, we will be able to uniquely match the
+   it to the correct change. */
+
+/* A start message will usually be for the first (current) change on our list.
+   In some cases it will be for a non-current change, and we can ignore it:
+
+   1. A,B,C get confchg1 adding C
+   2. C sends start for confchg1
+   3. A,B,C get confchg2 adding D
+   4. A,B,C,D recv start from C for confchg1 - ignored
+   5. C,D send start for confchg2
+   6. A,B send start for confchg2
+   7. A,B,C,D recv all start messages for confchg2, and start kernel
+ 
+   In step 4, how do the nodes know whether the start message from C is
+   for confchg1 or confchg2?  Hopefully by comparing the counts and members. */
+
+static struct change *find_change(struct lockspace *ls, struct dlm_header *hd,
+				  int len)
+{
+	struct change *cg;
+
+	list_for_each_entry_reverse(cg, &ls->changes, list) {
+		if (!match_change(ls, cg, hd, len))
+			continue;
+		return cg;
+	}
+
+	log_group(ls, "find_change %d:%u no match", hd->nodeid, hd->msgdata);
+	return NULL;
+}
+
+/* We require new members (memb->added) to be joining the lockspace
+   (memb->joining).  New members that are not joining the lockspace can happen
+   when the cpg partitions and is then merged back together (shouldn't happen
+   in general, but is possible).  We label these new members that are not
+   joining as "disallowed", and ignore their start message. */
+
+/* Handle spurious joins by ignoring this start message if the node says it's
+   not joining (i.e. it's already a member), but we see it being added (i.e.
+   it's not already a member) */
+
+static void receive_start(struct lockspace *ls, struct dlm_header *hd, int len)
+{
+	struct change *cg;
+	struct member *memb;
+	int joining = 0;
+	uint32_t seq = hd->msgdata;
+
+	log_group(ls, "receive_start %d:%u flags %x len %d", hd->nodeid, seq,
+		  hd->flags, len);
+
+	cg = find_change(ls, hd, len);
+	if (!cg)
+		return;
+
+	memb = find_memb(cg, hd->nodeid);
+	if (!memb) {
+		/* this should never happen since match_change checks it */
+		log_error("receive_start no member %d", hd->nodeid);
+		return;
+	}
+
+	memb->start_flags = hd->flags;
+
+	if (memb->start_flags & DLM_MFLG_JOINING)
+		joining = 1;
+
+	if ((memb->added && !joining) || (!memb->added && joining)) {
+		log_error("receive_start %d:%u disallowed added %d joining %d",
+			  hd->nodeid, seq, memb->added, joining);
+		memb->disallowed = 1;
+	} else {
+		node_history_start(ls, hd->nodeid);
+		memb->start = 1;
+	}
+}
+
+static void send_start(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+	struct dlm_header *hd;
+	struct member *memb;
+	int n_ints, len, *p, i;
+	char *buf;
+
+	n_ints = 4 + cg->member_count;
+	len = sizeof(struct dlm_header) + (n_ints * sizeof(int));
+
+	buf = malloc(len);
+	if (!buf) {
+		return;
+	}
+	memset(buf, 0, len);
+
+	hd = (struct dlm_header *)buf;
+	hd->type = DLM_MSG_START;
+	hd->msgdata = cg->seq;
+
+	if (cg->we_joined)
+		hd->flags |= DLM_MFLG_JOINING;
+
+	if (!ls->need_plocks)
+		hd->flags |= DLM_MFLG_HAVEPLOCK;
+
+	p = (int *)(buf + sizeof(struct dlm_header));
+
+	/* sending all this stuff is probably unnecessary, but gives
+	   us more certainty in matching stopped messages to the correct
+	   change that they are for */
+
+	p[0] = cpu_to_le32(cg->member_count);
+	p[1] = cpu_to_le32(cg->joined_count);
+	p[2] = cpu_to_le32(cg->remove_count);
+	p[3] = cpu_to_le32(cg->failed_count);
+
+	i = 4;
+	list_for_each_entry(memb, &cg->members, list)
+		p[i++] = cpu_to_le32(memb->nodeid);
+
+	log_group(ls, "send_start %u flags %x counts %d %d %d %d", cg->seq,
+		  hd->flags, cg->member_count, cg->joined_count,
+		  cg->remove_count, cg->failed_count);
+
+	dlm_send_message(ls, buf, len);
+
+	free(buf);
+}
+
+static int nodes_added(struct lockspace *ls)
+{
+	struct change *cg;
+
+	list_for_each_entry(cg, &ls->changes, list) {
+		if (cg->joined_count)
+			return 1;
+	}
+	return 0;
+}
+
+static void prepare_plocks(struct lockspace *ls)
+{
+	struct change *cg = list_first_entry(&ls->changes, struct change, list);
+	struct member *memb;
+
+	if (!cfgd_enable_plock)
+		return;
+
+	/* if we're the only node in the lockspace, then we are the ckpt_node
+	   and we don't need plocks */
+
+	if (cg->member_count == 1) {
+		list_for_each_entry(memb, &cg->members, list) {
+			if (memb->nodeid != our_nodeid) {
+				log_error("prepare_plocks other member %d",
+					  memb->nodeid);
+			}
+		}
+		ls->plock_ckpt_node = our_nodeid;
+		ls->need_plocks = 0;
+		return;
+	}
+
+	/* the low node that indicated it had plock state in its last
+	   start message is the ckpt_node */
+
+	set_plock_ckpt_node(ls);
+
+	/* We save all plock messages from the time that the low node saves
+	   existing plock state in the ckpt to the time that we read that state
+	   from the ckpt. */
+
+	if (ls->need_plocks) {
+		ls->save_plocks = 1;
+		return;
+	}
+
+	if (ls->plock_ckpt_node != our_nodeid)
+		return;
+
+	/* At each start, a ckpt is written if there have been nodes added
+	   since the last start/ckpt.  If no nodes have been added, no one
+	   does anything with ckpts.  If the node that wrote the last ckpt
+	   is no longer the ckpt_node, the new ckpt_node will unlink and
+	   write a new one.  If the node that wrote the last ckpt is still
+	   the ckpt_node and no plock state has changed since the last ckpt,
+	   it will just leave the old ckpt and not write a new one.
+	 
+	   A new ckpt_node will send a stored message even if it doesn't
+	   write a ckpt because new nodes in the previous start may be
+	   waiting to read the ckpt from the previous ckpt_node after ignoring
+	   the previous stored message.  They will read the ckpt from the
+	   previous ckpt_node upon receiving the stored message from us. */
+
+	if (nodes_added(ls))
+		store_plocks(ls);
+	send_plocks_stored(ls);
+}
+
+static void apply_changes(struct lockspace *ls)
+{
+	struct change *cg;
+
+	if (list_empty(&ls->changes))
+		return;
+	cg = list_first_entry(&ls->changes, struct change, list);
+
+	switch (cg->state) {
+
+	case CGST_WAIT_CONDITIONS:
+		if (wait_conditions_done(ls)) {
+			send_start(ls);
+			cg->state = CGST_WAIT_MESSAGES;
+		}
+		break;
+
+	case CGST_WAIT_MESSAGES:
+		if (wait_messages_done(ls)) {
+			start_kernel(ls);
+			prepare_plocks(ls);
+			cleanup_changes(ls);
+		}
+		break;
+
+	default:
+		log_error("apply_changes invalid state %d", cg->state);
+	}
+}
+
+void process_lockspace_changes(void)
+{
+	struct lockspace *ls, *safe;
+
+	list_for_each_entry_safe(ls, safe, &lockspaces, list) {
+		if (!list_empty(&ls->changes))
+			apply_changes(ls);
+	}
+}
+
+static int add_change(struct lockspace *ls,
+		      struct cpg_address *member_list, int member_list_entries,
+		      struct cpg_address *left_list, int left_list_entries,
+		      struct cpg_address *joined_list, int joined_list_entries,
+		      struct change **cg_out)
+{
+	struct change *cg;
+	struct member *memb;
+	int i, error;
+
+	cg = malloc(sizeof(struct change));
+	if (!cg)
+		goto fail_nomem;
+	memset(cg, 0, sizeof(struct change));
+	INIT_LIST_HEAD(&cg->members);
+	INIT_LIST_HEAD(&cg->removed);
+	cg->seq = ++ls->change_seq;
+	cg->state = CGST_WAIT_CONDITIONS;
+
+	cg->member_count = member_list_entries;
+	cg->joined_count = joined_list_entries;
+	cg->remove_count = left_list_entries;
+
+	for (i = 0; i < member_list_entries; i++) {
+		memb = malloc(sizeof(struct member));
+		if (!memb)
+			goto fail_nomem;
+		memset(memb, 0, sizeof(struct member));
+		memb->nodeid = member_list[i].nodeid;
+		list_add_tail(&memb->list, &cg->members);
+	}
+
+	for (i = 0; i < left_list_entries; i++) {
+		memb = malloc(sizeof(struct member));
+		if (!memb)
+			goto fail_nomem;
+		memset(memb, 0, sizeof(struct member));
+		memb->nodeid = left_list[i].nodeid;
+		if (left_list[i].reason == CPG_REASON_NODEDOWN ||
+		    left_list[i].reason == CPG_REASON_PROCDOWN) {
+			memb->failed = 1;
+			cg->failed_count++;
+		}
+		list_add_tail(&memb->list, &cg->removed);
+
+		if (memb->failed)
+			node_history_fail(ls, memb->nodeid);
+		else
+			node_history_left(ls, memb->nodeid);
+
+		log_group(ls, "add_change %u nodeid %d remove reason %d",
+			  cg->seq, memb->nodeid, left_list[i].reason);
+	}
+
+	for (i = 0; i < joined_list_entries; i++) {
+		memb = find_memb(cg, joined_list[i].nodeid);
+		if (!memb) {
+			log_error("no member %d", joined_list[i].nodeid);
+			error = -ENOENT;
+			goto fail;
+		}
+		memb->added = 1;
+
+		if (memb->nodeid == our_nodeid)
+			cg->we_joined = 1;
+		else
+			node_history_init(ls, memb->nodeid);
+
+		log_group(ls, "add_change %u nodeid %d joined", cg->seq,
+			  memb->nodeid);
+	}
+
+	if (cg->we_joined)
+		list_for_each_entry(memb, &cg->members, list)
+			node_history_init(ls, memb->nodeid);
+
+	log_group(ls, "add_change %u member %d joined %d remove %d failed %d",
+		  cg->seq, cg->member_count, cg->joined_count, cg->remove_count,
+		  cg->failed_count);
+
+	list_add(&cg->list, &ls->changes);
+	*cg_out = cg;
+	return 0;
+
+ fail_nomem:
+	log_error("no memory");
+	error = -ENOMEM;
+ fail:
+	free_cg(cg);
+	return error;
+}
+
+static int we_left(struct cpg_address *left_list, int left_list_entries)
+{
+	int i;
+
+	for (i = 0; i < left_list_entries; i++) {
+		if (left_list[i].nodeid == our_nodeid)
+			return 1;
+	}
+	return 0;
+}
+
+static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		       struct cpg_address *member_list, int member_list_entries,
+		       struct cpg_address *left_list, int left_list_entries,
+		       struct cpg_address *joined_list, int joined_list_entries)
+{
+	struct lockspace *ls;
+	struct change *cg;
+	struct member *memb;
+	int rv;
+
+	ls = find_ls_handle(handle);
+	if (!ls) {
+		log_error("confchg_cb no lockspace for cpg %s",
+			  group_name->value);
+		return;
+	}
+
+	if (ls->leaving && we_left(left_list, left_list_entries)) {
+		/* we called cpg_leave(), and this should be the final
+		   cpg callback we receive */
+		log_group(ls, "confchg for our leave");
+		stop_kernel(ls, 0);
+		set_configfs_members(ls->name, 0, NULL, 0, NULL);
+		set_sysfs_event_done(ls->name, 0);
+		cpg_finalize(ls->cpg_handle);
+		client_dead(ls->cpg_client);
+		purge_plocks(ls, our_nodeid, 1);
+		list_del(&ls->list);
+		free_ls(ls);
+		return;
+	}
+
+	rv = add_change(ls, member_list, member_list_entries,
+			left_list, left_list_entries,
+			joined_list, joined_list_entries, &cg);
+	if (rv)
+		return;
+
+	stop_kernel(ls, cg->seq);
+
+	list_for_each_entry(memb, &cg->removed, list)
+		purge_plocks(ls, memb->nodeid, 0);
+
+#if 0
+	/* deadlock code needs to adjust per a confchg, is this the right
+	   way/place for this? */
+
+	deadlk_confchg(ls, member_list, member_list_entries,
+		       left_list, left_list_entries,
+		       joined_list, joined_list_entries);
+#endif
+}
+
+static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		       uint32_t nodeid, uint32_t pid, void *data, int len)
+{
+	struct lockspace *ls;
+	struct dlm_header *hd;
+
+	ls = find_ls_handle(handle);
+	if (!ls) {
+		log_error("deliver_cb no ls for cpg %s", group_name->value);
+		return;
+	}
+
+	hd = (struct dlm_header *)data;
+
+	hd->version[0]  = le16_to_cpu(hd->version[0]);
+	hd->version[1]  = le16_to_cpu(hd->version[1]);
+	hd->version[2]  = le16_to_cpu(hd->version[2]);
+	hd->type        = le16_to_cpu(hd->type);
+	hd->nodeid      = le32_to_cpu(hd->nodeid);
+	hd->to_nodeid   = le32_to_cpu(hd->to_nodeid);
+	hd->global_id   = le32_to_cpu(hd->global_id);
+	hd->flags       = le32_to_cpu(hd->flags);
+	hd->msgdata     = le32_to_cpu(hd->msgdata);
+
+	if (hd->version[0] != protocol_active[0]) {
+		log_error("reject message from %d version %u.%u.%u vs %u.%u.%u",
+			  nodeid, hd->version[0], hd->version[1],
+			  hd->version[2], protocol_active[0],
+			  protocol_active[1], protocol_active[2]);
+		return;
+	}
+
+	if (hd->nodeid != nodeid) {
+		log_error("bad msg nodeid %d %d", hd->nodeid, nodeid);
+		return;
+	}
+
+	switch (hd->type) {
+	case DLM_MSG_START:
+		receive_start(ls, hd, len);
+		break;
+
+	case DLM_MSG_PLOCK:
+		receive_plock(ls, hd, len);
+		break;
+
+	case DLM_MSG_PLOCK_OWN:
+		receive_own(ls, hd, len);
+		break;
+
+	case DLM_MSG_PLOCK_DROP:
+		receive_drop(ls, hd, len);
+		break;
+
+	case DLM_MSG_PLOCK_SYNC_LOCK:
+	case DLM_MSG_PLOCK_SYNC_WAITER:
+		receive_sync(ls, hd, len);
+		break;
+
+	case DLM_MSG_PLOCKS_STORED:
+		receive_plocks_stored(ls, hd, len);
+		break;
+
+	case DLM_MSG_DEADLK_CYCLE_START:
+		receive_cycle_start(ls, hd, len);
+		break;
+
+	case DLM_MSG_DEADLK_CYCLE_END:
+		receive_cycle_end(ls, hd, len);
+		break;
+
+	case DLM_MSG_DEADLK_CHECKPOINT_READY:
+		receive_checkpoint_ready(ls, hd, len);
+		break;
+
+	case DLM_MSG_DEADLK_CANCEL_LOCK:
+		receive_cancel_lock(ls, hd, len);
+		break;
+
+	default:
+		log_error("unknown msg type %d", hd->type);
+	}
+}
+
+static cpg_callbacks_t cpg_callbacks = {
+	.cpg_deliver_fn = deliver_cb,
+	.cpg_confchg_fn = confchg_cb,
+};
+
+void update_flow_control_status(void)
+{
+	cpg_flow_control_state_t flow_control_state;
+	cpg_error_t error;
+        
+	error = cpg_flow_control_state_get(daemon_handle, &flow_control_state);
+	if (error != CPG_OK) {
+		log_error("cpg_flow_control_state_get %d", error);
+		return;
+	}
+
+	if (flow_control_state == CPG_FLOW_CONTROL_ENABLED) {
+		if (message_flow_control_on == 0) {
+			log_debug("flow control on");
+		}
+		message_flow_control_on = 1;
+	} else {
+		if (message_flow_control_on) {
+			log_debug("flow control off");
+		}
+		message_flow_control_on = 0;
+	}
+}
+
+static void process_lockspace_cpg(int ci)
+{
+	struct lockspace *ls;
+	cpg_error_t error;
+
+	ls = find_ls_ci(ci);
+	if (!ls) {
+		log_error("process_lockspace_cpg no lockspace for ci %d", ci);
+		return;
+	}
+
+	error = cpg_dispatch(ls->cpg_handle, CPG_DISPATCH_ALL);
+	if (error != CPG_OK) {
+		log_error("cpg_dispatch error %d", error);
+		return;
+	}
+
+	apply_changes(ls);
+
+	update_flow_control_status();
+}
+
+/* received an "online" uevent from dlm-kernel */
+
+int dlm_join_lockspace(struct lockspace *ls)
+{
+	cpg_error_t error;
+	cpg_handle_t h;
+	struct cpg_name name;
+	int i = 0, fd, ci;
+
+	error = cpg_initialize(&h, &cpg_callbacks);
+	if (error != CPG_OK) {
+		log_error("cpg_initialize error %d", error);
+		goto fail_free;
+	}
+
+	cpg_fd_get(h, &fd);
+
+	ci = client_add(fd, process_lockspace_cpg, NULL);
+
+	list_add(&ls->list, &lockspaces);
+
+	ls->cpg_handle = h;
+	ls->cpg_client = ci;
+	ls->cpg_fd = fd;
+	ls->kernel_stopped = 1;
+	ls->need_plocks = 1;
+	ls->joining = 1;
+
+	memset(&name, 0, sizeof(name));
+	sprintf(name.value, "dlm:%s", ls->name);
+	name.length = strlen(name.value) + 1;
+
+	/* TODO: allow global_id to be set in cluster.conf? */
+	ls->global_id = cpgname_to_crc(name.value, name.length);
+
+ retry:
+	error = cpg_join(h, &name);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		sleep(1);
+		if (!(++i % 10))
+			log_error("cpg_join error retrying");
+		goto retry;
+	}
+	if (error != CPG_OK) {
+		log_error("cpg_join error %d", error);
+		cpg_finalize(h);
+		goto fail;
+	}
+
+	return 0;
+
+ fail:
+	list_del(&ls->list);
+	client_dead(ci);
+	cpg_finalize(h);
+ fail_free:
+	free_ls(ls);
+	return error;
+}
+
+/* received an "offline" uevent from dlm-kernel */
+
+int dlm_leave_lockspace(struct lockspace *ls)
+{
+	cpg_error_t error;
+	struct cpg_name name;
+	int i = 0;
+
+	ls->leaving = 1;
+
+	memset(&name, 0, sizeof(name));
+	sprintf(name.value, "dlm:%s", ls->name);
+	name.length = strlen(name.value) + 1;
+
+ retry:
+	error = cpg_leave(ls->cpg_handle, &name);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		sleep(1);
+		if (!(++i % 10))
+			log_error("cpg_leave error retrying");
+		goto retry;
+	}
+	if (error != CPG_OK)
+		log_error("cpg_leave error %d", error);
+
+	return 0;
+}
+
+int setup_cpg(void)
+{
+	cpg_error_t error;
+
+	error = cpg_initialize(&daemon_handle, &cpg_callbacks);
+	if (error != CPG_OK) {
+		log_error("setup_cpg cpg_initialize error %d", error);
+		return -1;
+	}
+
+	/* join "dlm_controld" cpg to interact with other daemons in
+	   the cluster before we start processing uevents?  Could this
+	   also help in handling transient partitions? */
+
+	return 0;
+}
+
diff --git a/gfs2/include/gfs2_disk_hash.h b/group/dlm_controld/crc.c
similarity index 96%
copy from gfs2/include/gfs2_disk_hash.h
copy to group/dlm_controld/crc.c
index 81e4196..cb486be 100644
--- a/gfs2/include/gfs2_disk_hash.h
+++ b/group/dlm_controld/crc.c
@@ -11,11 +11,9 @@
 *******************************************************************************
 ******************************************************************************/
 
-#ifndef __GFS2_DISK_HASH_DOT_H__
-#define __GFS2_DISK_HASH_DOT_H__
+#include "dlm_daemon.h"
 
-static const uint32_t crc_32_tab[] =
-{
+static const uint32_t crc_32_tab[] = {
   0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
   0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
   0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
@@ -51,6 +49,9 @@ static const uint32_t crc_32_tab[] =
 };
 
 /**
+ *
+ * Copied from:
+ *
  * gfs2_disk_hash - hash an array of data
  * @data: the data to be hashed
  * @len: the length of data to be hashed
@@ -70,7 +71,7 @@ static const uint32_t crc_32_tab[] =
  * Returns: the hash
  */
 
-uint32_t gfs2_disk_hash(const char *data, int len)
+uint32_t cpgname_to_crc(const char *data, int len)
 {
 	uint32_t hash = 0xFFFFFFFF;
 
@@ -82,5 +83,3 @@ uint32_t gfs2_disk_hash(const char *data, int len)
 	return hash;
 }
 
-#endif
-
diff --git a/group/dlm_controld/deadlock.c b/group/dlm_controld/deadlock.c
index f21beda..e9f7986 100644
--- a/group/dlm_controld/deadlock.c
+++ b/group/dlm_controld/deadlock.c
@@ -11,13 +11,9 @@
 ******************************************************************************/
 
 #include "dlm_daemon.h"
+#include "config.h"
 #include "libdlm.h"
 
-int deadlock_enabled = 0;
-
-extern struct list_head lockspaces;
-extern int our_nodeid;
-
 static SaCkptHandleT global_ckpt_h;
 static SaCkptCallbacksT callbacks = { 0, 0 };
 static SaVersionT version = { 'B', 1, 1 };
@@ -96,26 +92,6 @@ struct trans {
 							 pointers */
 };
 
-#define DLM_HEADER_MAJOR	1
-#define DLM_HEADER_MINOR	0
-#define DLM_HEADER_PATCH	0
-
-#define DLM_MSG_CYCLE_START	 1
-#define DLM_MSG_CYCLE_END	 2
-#define DLM_MSG_CHECKPOINT_READY 3
-#define DLM_MSG_CANCEL_LOCK	 4
-
-struct dlm_header {
-	uint16_t		version[3];
-	uint16_t		type; /* MSG_ */
-	uint32_t		nodeid; /* sender */
-	uint32_t		to_nodeid; /* 0 if to all */
-	uint32_t		global_id;
-	uint32_t		lkid;
-	uint32_t		pad;
-	char			name[MAXNAME];
-};
-
 static const int __dlm_compat_matrix[8][8] = {
       /* UN NL CR CW PR PW EX PD */
         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
@@ -184,27 +160,11 @@ void setup_deadlock(void)
 {
 	SaAisErrorT rv;
 
-	if (!deadlock_enabled)
-		return;
-
 	rv = saCkptInitialize(&global_ckpt_h, &callbacks, &version);
 	if (rv != SA_AIS_OK)
 		log_error("ckpt init error %d", rv);
 }
 
-/* FIXME: use private data hooks into libcpg to save ls */
-
-static struct lockspace *find_ls_by_handle(cpg_handle_t h)
-{
-	struct lockspace *ls;
-
-	list_for_each_entry(ls, &lockspaces, list) {
-		if (ls->cpg_h == h)
-			return ls;
-	}
-	return NULL;
-}
-
 static struct dlm_rsb *get_resource(struct lockspace *ls, char *name, int len)
 {
 	struct dlm_rsb *r;
@@ -599,7 +559,7 @@ static int _unlink_checkpoint(struct lockspace *ls, SaNameT *name)
 	int ret = 0;
 	int retries;
 
-	h = (SaCkptCheckpointHandleT) ls->lock_ckpt_handle;
+	h = (SaCkptCheckpointHandleT) ls->deadlk_ckpt_handle;
 	log_group(ls, "unlink ckpt %llx", (unsigned long long)h);
 
 	retries = 0;
@@ -655,7 +615,7 @@ static int _unlink_checkpoint(struct lockspace *ls, SaNameT *name)
 			  (unsigned long long)h, rv, ls->name);
 	}
  out:
-	ls->lock_ckpt_handle = 0;
+	ls->deadlk_ckpt_handle = 0;
 	return ret;
 }
 
@@ -819,7 +779,7 @@ static void write_checkpoint(struct lockspace *ls)
 	name.length = len;
 
 	/* unlink an old checkpoint before we create a new one */
-	if (ls->lock_ckpt_handle) {
+	if (ls->deadlk_ckpt_handle) {
 		log_error("write_checkpoint: old ckpt");
 		if (_unlink_checkpoint(ls, &name))
 			return;
@@ -880,7 +840,7 @@ static void write_checkpoint(struct lockspace *ls)
 
 	log_group(ls, "write_checkpoint: open ckpt handle %llx",
 		  (unsigned long long)h);
-	ls->lock_ckpt_handle = (uint64_t) h;
+	ls->deadlk_ckpt_handle = (uint64_t) h;
 
 	list_for_each_entry(r, &ls->resources, list) {
 		memset(buf, 0, sizeof(buf));
@@ -918,34 +878,8 @@ static void write_checkpoint(struct lockspace *ls)
 	}
 }
 
-static int _send_message(cpg_handle_t h, void *buf, int len, int type)
-{
-	struct iovec iov;
-	cpg_error_t error;
-	int retries = 0;
-
-	iov.iov_base = buf;
-	iov.iov_len = len;
- retry:
-	error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
-	if (error == CPG_ERR_TRY_AGAIN) {
-		retries++;
-		usleep(1000);
-		if (!(retries % 100))
-			log_error("cpg_mcast_joined retry %d", retries);
-		if (retries < 1000)
-			goto retry;
-	}
-	if (error != CPG_OK) {
-		log_error("cpg_mcast_joined error %d handle %llx",
-			  (int)error, (unsigned long long)h);
-		disable_deadlock();
-		return -1;
-	}
-	return 0;
-}
-
-static void send_message(struct lockspace *ls, int type)
+static void send_message(struct lockspace *ls, int type,
+			 uint32_t to_nodeid, uint32_t msgdata)
 {
 	struct dlm_header *hd;
 	int len;
@@ -961,16 +895,11 @@ static void send_message(struct lockspace *ls, int type)
 	memset(buf, 0, len);
 
 	hd = (struct dlm_header *)buf;
-	hd->version[0]  = cpu_to_le16(DLM_HEADER_MAJOR);
-	hd->version[1]  = cpu_to_le16(DLM_HEADER_MINOR);
-	hd->version[2]  = cpu_to_le16(DLM_HEADER_PATCH);
-	hd->type	= cpu_to_le16(type);
-	hd->nodeid      = cpu_to_le32(our_nodeid);
-	hd->to_nodeid   = 0;
-	hd->global_id   = cpu_to_le32(ls->global_id);
-	memcpy(hd->name, ls->name, strlen(ls->name));
+	hd->type = type;
+	hd->to_nodeid = to_nodeid;
+	hd->msgdata = msgdata;
 
-	_send_message(ls->cpg_h, buf, len, type);
+	dlm_send_message(ls, buf, len);
 
 	free(buf);
 }
@@ -978,43 +907,27 @@ static void send_message(struct lockspace *ls, int type)
 static void send_checkpoint_ready(struct lockspace *ls)
 {
 	log_group(ls, "send_checkpoint_ready");
-	send_message(ls, DLM_MSG_CHECKPOINT_READY);
+	send_message(ls, DLM_MSG_DEADLK_CHECKPOINT_READY, 0, 0);
 }
 
 void send_cycle_start(struct lockspace *ls)
 {
-	if (!deadlock_enabled)
-		return;
 	log_group(ls, "send_cycle_start");
-	send_message(ls, DLM_MSG_CYCLE_START);
+	send_message(ls, DLM_MSG_DEADLK_CYCLE_START, 0, 0);
 }
 
-void send_cycle_end(struct lockspace *ls)
+static void send_cycle_end(struct lockspace *ls)
 {
-	if (!deadlock_enabled)
-		return;
 	log_group(ls, "send_cycle_end");
-	send_message(ls, DLM_MSG_CYCLE_END);
+	send_message(ls, DLM_MSG_DEADLK_CYCLE_END, 0, 0);
 }
 
 static void send_cancel_lock(struct lockspace *ls, struct trans *tr,
 			     struct dlm_lkb *lkb)
 {
-	struct dlm_header *hd;
-	int len;
-	char *buf;
 	int to_nodeid;
 	uint32_t lkid;
 
-	len = sizeof(struct dlm_header);
-	buf = malloc(len);
-	if (!buf) {
-		log_error("send_message: no memory");
-		disable_deadlock();
-		return;
-	}
-	memset(buf, 0, len);
-
 	if (!lkb->lock.nodeid)
 		lkid = lkb->lock.id;
 	else
@@ -1025,20 +938,7 @@ static void send_cancel_lock(struct lockspace *ls, struct trans *tr,
 		  to_nodeid, lkb->rsb->name, lkid,
 		  (unsigned long long)lkb->lock.xid);
 
-	hd = (struct dlm_header *)buf;
-	hd->version[0]  = cpu_to_le16(DLM_HEADER_MAJOR);
-	hd->version[1]  = cpu_to_le16(DLM_HEADER_MINOR);
-	hd->version[2]  = cpu_to_le16(DLM_HEADER_PATCH);
-	hd->type	= cpu_to_le16(DLM_MSG_CANCEL_LOCK);
-	hd->nodeid      = cpu_to_le32(our_nodeid);
-	hd->to_nodeid   = cpu_to_le32(to_nodeid);
-	hd->lkid        = cpu_to_le32(lkid);
-	hd->global_id   = cpu_to_le32(ls->global_id);
-	memcpy(hd->name, ls->name, strlen(ls->name));
-
-	_send_message(ls->cpg_h, buf, len, DLM_MSG_CANCEL_LOCK);
-
-	free(buf);
+	send_message(ls, DLM_MSG_DEADLK_CANCEL_LOCK, to_nodeid, lkid);
 }
 
 static void dump_resources(struct lockspace *ls)
@@ -1075,7 +975,7 @@ static void run_deadlock(struct lockspace *ls)
 	if (ls->all_checkpoints_ready)
 		log_group(ls, "WARNING: run_deadlock all_checkpoints_ready");
 
-	list_for_each_entry(node, &ls->nodes, list) {
+	list_for_each_entry(node, &ls->deadlk_nodes, list) {
 		if (!node->in_cycle)
 			continue;
 		if (!node->checkpoint_ready)
@@ -1089,13 +989,13 @@ static void run_deadlock(struct lockspace *ls)
 
 	ls->all_checkpoints_ready = 1;
 
-	list_for_each_entry(node, &ls->nodes, list) {
+	list_for_each_entry(node, &ls->deadlk_nodes, list) {
 		if (!node->in_cycle)
 			continue;
 		if (node->nodeid < low || low == -1)
 			low = node->nodeid;
 	}
-	ls->low_nodeid = low;
+	ls->deadlk_low_nodeid = low;
 
 	if (low == our_nodeid)
 		find_deadlock(ls);
@@ -1103,15 +1003,17 @@ static void run_deadlock(struct lockspace *ls)
 		log_group(ls, "defer resolution to low nodeid %d", low);
 }
 
-static void receive_checkpoint_ready(struct lockspace *ls, int nodeid)
+void receive_checkpoint_ready(struct lockspace *ls, struct dlm_header *hd,
+			      int len)
 {
 	struct node *node;
+	int nodeid = hd->nodeid;
 
 	log_group(ls, "receive_checkpoint_ready from %d", nodeid);
 
 	read_checkpoint(ls, nodeid);
 
-	list_for_each_entry(node, &ls->nodes, list) {
+	list_for_each_entry(node, &ls->deadlk_nodes, list) {
 		if (node->nodeid == nodeid) {
 			node->checkpoint_ready = 1;
 			break;
@@ -1121,9 +1023,10 @@ static void receive_checkpoint_ready(struct lockspace *ls, int nodeid)
 	run_deadlock(ls);
 }
 
-static void receive_cycle_start(struct lockspace *ls, int nodeid)
+void receive_cycle_start(struct lockspace *ls, struct dlm_header *hd, int len)
 {
 	struct node *node;
+	int nodeid = hd->nodeid;
 	int rv;
 
 	log_group(ls, "receive_cycle_start from %d", nodeid);
@@ -1135,7 +1038,7 @@ static void receive_cycle_start(struct lockspace *ls, int nodeid)
 	ls->cycle_running = 1;
 	gettimeofday(&ls->cycle_start_time, NULL);
 
-	list_for_each_entry(node, &ls->nodes, list)
+	list_for_each_entry(node, &ls->deadlk_nodes, list)
 		node->in_cycle = 1;
 
 	rv = read_debugfs_locks(ls);
@@ -1166,9 +1069,10 @@ static uint64_t dt_usec(struct timeval *start, struct timeval *stop)
 /* TODO: nodes added during a cycle - what will they do with messages
    they recv from other nodes running the cycle? */
 
-static void receive_cycle_end(struct lockspace *ls, int nodeid)
+void receive_cycle_end(struct lockspace *ls, struct dlm_header *hd, int len)
 {
 	struct node *node;
+	int nodeid = hd->nodeid;
 	uint64_t usec;
 
 	if (!ls->cycle_running) {
@@ -1185,7 +1089,7 @@ static void receive_cycle_end(struct lockspace *ls, int nodeid)
 	ls->cycle_running = 0;
 	ls->all_checkpoints_ready = 0;
 
-	list_for_each_entry(node, &ls->nodes, list)
+	list_for_each_entry(node, &ls->deadlk_nodes, list)
 		node->checkpoint_ready = 0;
 
 	free_resources(ls);
@@ -1193,9 +1097,11 @@ static void receive_cycle_end(struct lockspace *ls, int nodeid)
 	unlink_checkpoint(ls);
 }
 
-static void receive_cancel_lock(struct lockspace *ls, int nodeid, uint32_t lkid)
+void receive_cancel_lock(struct lockspace *ls, struct dlm_header *hd, int len)
 {
 	dlm_lshandle_t h;
+	int nodeid = hd->nodeid;
+	uint32_t lkid = hd->msgdata;
 	int rv;
 
 	if (nodeid != our_nodeid)
@@ -1219,51 +1125,6 @@ static void receive_cancel_lock(struct lockspace *ls, int nodeid, uint32_t lkid)
 	dlm_close_lockspace(h);
 }
 
-static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
-		uint32_t nodeid, uint32_t pid, void *data, int data_len)
-{
-	struct lockspace *ls;
-	struct dlm_header *hd;
-
-	ls = find_ls_by_handle(handle);
-	if (!ls)
-		return;
-
-	hd = (struct dlm_header *) data;
-
-	hd->version[0]  = le16_to_cpu(hd->version[0]);
-	hd->version[1]  = le16_to_cpu(hd->version[1]);
-	hd->version[2]  = le16_to_cpu(hd->version[2]);
-	hd->type	= le16_to_cpu(hd->type);
-	hd->nodeid      = le32_to_cpu(hd->nodeid);
-	hd->to_nodeid   = le32_to_cpu(hd->to_nodeid);
-	hd->global_id   = le32_to_cpu(hd->global_id);
-
-	if (hd->version[0] != DLM_HEADER_MAJOR) {
-		log_error("reject message version %u.%u.%u",
-			  hd->version[0], hd->version[1], hd->version[2]);
-		return;
-	}
-
-	switch (hd->type) {
-	case DLM_MSG_CYCLE_START:
-		receive_cycle_start(ls, hd->nodeid);
-		break;
-	case DLM_MSG_CYCLE_END:
-		receive_cycle_end(ls, hd->nodeid);
-		break;
-	case DLM_MSG_CHECKPOINT_READY:
-		receive_checkpoint_ready(ls, hd->nodeid);
-		break;
-	case DLM_MSG_CANCEL_LOCK:
-		receive_cancel_lock(ls, hd->nodeid, hd->lkid);
-		break;
-	default:
-		log_error("unknown message type %d from %d",
-			  hd->type, hd->nodeid);
-	}
-}
-
 static void node_joined(struct lockspace *ls, int nodeid)
 {
 	struct node *node;
@@ -1276,7 +1137,7 @@ static void node_joined(struct lockspace *ls, int nodeid)
 	}
 	memset(node, 0, sizeof(struct node));
 	node->nodeid = nodeid;
-	list_add_tail(&node->list, &ls->nodes);
+	list_add_tail(&node->list, &ls->deadlk_nodes);
 	log_group(ls, "node %d joined deadlock cpg", nodeid);
 }
 
@@ -1284,7 +1145,7 @@ static void node_left(struct lockspace *ls, int nodeid, int reason)
 {
 	struct node *node, *safe;
 
-	list_for_each_entry_safe(node, safe, &ls->nodes, list) {
+	list_for_each_entry_safe(node, safe, &ls->deadlk_nodes, list) {
 		if (node->nodeid != nodeid)
 			continue;
 
@@ -1296,20 +1157,15 @@ static void node_left(struct lockspace *ls, int nodeid, int reason)
 
 static void purge_locks(struct lockspace *ls, int nodeid);
 
-static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
+static void deadlk_confchg(struct lockspace *ls,
 		struct cpg_address *member_list, int member_list_entries,
 		struct cpg_address *left_list, int left_list_entries,
 		struct cpg_address *joined_list, int joined_list_entries)
 {
-	struct lockspace *ls;
 	int i;
 
-	ls = find_ls_by_handle(handle);
-	if (!ls)
-		return;
-
-	if (!ls->got_first_confchg) {
-		ls->got_first_confchg = 1;
+	if (!ls->deadlk_confchg_init) {
+		ls->deadlk_confchg_init = 1;
 		for (i = 0; i < member_list_entries; i++)
 			node_joined(ls, member_list[i].nodeid);
 		return;
@@ -1339,7 +1195,7 @@ static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
 		purge_locks(ls, left_list[i].nodeid);
 
 	for (i = 0; i < left_list_entries; i++) {
-		if (left_list[i].nodeid != ls->low_nodeid)
+		if (left_list[i].nodeid != ls->deadlk_low_nodeid)
 			continue;
 		/* this will set a new low node which will call find_deadlock */
 		run_deadlock(ls);
@@ -1347,112 +1203,6 @@ static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
 	}
 }
 
-static void process_deadlock_cpg(int ci)
-{
-	struct lockspace *ls;
-	cpg_error_t error;
-
-	ls = get_client_lockspace(ci);
-	if (!ls)
-		return;
-
-	error = cpg_dispatch(ls->cpg_h, CPG_DISPATCH_ONE);
-	if (error != CPG_OK)
-		log_error("cpg_dispatch error %d", error);
-}
-
-cpg_callbacks_t ls_callbacks = {
-	.cpg_deliver_fn = deliver_cb,
-	.cpg_confchg_fn = confchg_cb,
-};
-
-static void make_cpgname(struct lockspace *ls, struct cpg_name *cn)
-{
-	char name[MAXNAME+8];
-
-	memset(name, 0, sizeof(name));
-	strncpy(name, ls->name, sizeof(name));
-	strncat(name, "_deadlk", 7);
-	memset(cn, 0, sizeof(struct cpg_name));
-	strncpy(cn->value, name, strlen(name) + 1);
-	cn->length = strlen(name) + 1;
-}
-
-void join_deadlock_cpg(struct lockspace *ls)
-{
-	cpg_handle_t h;
-	struct cpg_name cpgname;
-	cpg_error_t error;
-	int retries = 0;
-	int fd, ci;
-
-	if (!deadlock_enabled)
-		return;
-
-	unlink_checkpoint(ls); /* not sure about this */
-
-	error = cpg_initialize(&h, &ls_callbacks);
-	if (error != CPG_OK) {
-		log_error("cpg_initialize error %d", error);
-		return;
-	}
-
-	cpg_fd_get(h, &fd);
-	if (fd < 0) {
-		log_error("cpg_fd_get error %d", error);
-		return;
-	}
-
-	ci = client_add(fd, process_deadlock_cpg, NULL);
-
-	make_cpgname(ls, &cpgname);
-
- retry:
-	error = cpg_join(h, &cpgname);
-	if (error == CPG_ERR_TRY_AGAIN) {
-		sleep(1);
-		if (retries++ < 10)
-			goto retry;
-	}
-	if (error != CPG_OK) {
-		log_error("deadlk cpg join error %d", error);
-		goto fail;
-	}
-
-	ls->cpg_h = h;
-	ls->cpg_ci = ci;
-	set_client_lockspace(ci, ls);
-	log_group(ls, "deadlk cpg ci %d fd %d", ci, fd);
-	return;
- fail:
-	cpg_finalize(h);
-	client_dead(ci);
-}
-
-void leave_deadlock_cpg(struct lockspace *ls)
-{
-	struct cpg_name cpgname;
-	cpg_error_t error;
-	int retries = 0;
-
-	if (!deadlock_enabled)
-		return;
-
-	make_cpgname(ls, &cpgname);
- retry:
-	error = cpg_leave(ls->cpg_h, &cpgname);
-	if (error == CPG_ERR_TRY_AGAIN) {
-		sleep(1);
-		if (retries++ < 10)
-			goto retry;
-	}
-	if (error != CPG_OK)
-		log_error("deadlk cpg leave error %d", error);
-
-	cpg_finalize(ls->cpg_h);
-	client_dead(ls->cpg_ci);
-}
-
 /* would we ever call this after we've created the transaction lists?
    I don't think so; I think it can only be called between reading
    checkpoints */
diff --git a/group/dlm_controld/dlm_daemon.h b/group/dlm_controld/dlm_daemon.h
index c164a81..d5657dd 100644
--- a/group/dlm_controld/dlm_daemon.h
+++ b/group/dlm_controld/dlm_daemon.h
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -41,6 +41,7 @@
 #include <sched.h>
 #include <signal.h>
 #include <sys/time.h>
+#include <dirent.h>
 
 #include <openais/saAis.h>
 #include <openais/saCkpt.h>
@@ -49,7 +50,6 @@
 #include "dlm_controld.h"
 #include "list.h"
 #include "linux_endian.h"
-#include "libgroup.h"
 
 #define MAXARGS		8
 #define MAXLINE		256
@@ -57,16 +57,38 @@
 #define MAXNAME		255
 #define MAX_NODES	256 /* should be same as MAX_GROUP_MEMBERS */
 #define MAX_NODE_ADDRESSES 4
+#define DUMP_SIZE	(1024 * 1024)
 
-extern char *prog_name;
 extern int daemon_debug_opt;
-extern int kernel_debug_opt;
+extern int daemon_quit;
+extern int poll_fencing;
+extern int poll_quorum;
+extern int poll_fs;
+extern int poll_ignore_plock;
+extern int plock_fd;
+extern int plock_ci;
+extern struct list_head lockspaces;
+extern int our_nodeid;
 extern char daemon_debug_buf[256];
+extern char dump_buf[DUMP_SIZE];
+extern int dump_point;
+extern int dump_wrap;
+
+void daemon_dump_save(void);
 
 #define log_debug(fmt, args...) \
 do { \
 	snprintf(daemon_debug_buf, 255, "%ld " fmt "\n", time(NULL), ##args); \
 	if (daemon_debug_opt) fprintf(stderr, "%s", daemon_debug_buf); \
+	daemon_dump_save(); \
+} while (0)
+
+#define log_group(ls, fmt, args...) \
+do { \
+	snprintf(daemon_debug_buf, 255, "%ld %s " fmt "\n", time(NULL), \
+		 (ls)->name, ##args); \
+	if (daemon_debug_opt) fprintf(stderr, "%s", daemon_debug_buf); \
+	daemon_dump_save(); \
 } while (0)
 
 #define log_error(fmt, args...) \
@@ -75,73 +97,166 @@ do { \
 	syslog(LOG_ERR, fmt, ##args); \
 } while (0)
 
-#define log_group(ls, fmt, args...) \
+#define log_plock(ls, fmt, args...) \
 do { \
 	snprintf(daemon_debug_buf, 255, "%ld %s " fmt "\n", time(NULL), \
 		 (ls)->name, ##args); \
-	if (daemon_debug_opt) fprintf(stderr, "%s", daemon_debug_buf); \
+	if (daemon_debug_opt && cfgd_plock_debug) fprintf(stderr, "%s", daemon_debug_buf); \
 } while (0)
 
+/* dlm_header types */
+enum {
+	DLM_MSG_START = 1,
+	DLM_MSG_PLOCK,
+	DLM_MSG_PLOCK_OWN,
+	DLM_MSG_PLOCK_DROP,
+	DLM_MSG_PLOCK_SYNC_LOCK,
+	DLM_MSG_PLOCK_SYNC_WAITER,
+	DLM_MSG_PLOCKS_STORED,
+	DLM_MSG_DEADLK_CYCLE_START,
+	DLM_MSG_DEADLK_CYCLE_END,
+	DLM_MSG_DEADLK_CHECKPOINT_READY,
+	DLM_MSG_DEADLK_CANCEL_LOCK
+};
+
+/* dlm_header flags */
+#define DLM_MFLG_JOINING   1  /* accompanies start, we are joining */
+#define DLM_MFLG_HAVEPLOCK 2  /* accompanies start, we have plock state */
+
+struct dlm_header {
+	uint16_t version[3];
+	uint16_t type;	  	/* DLM_MSG_ */
+	uint32_t nodeid;	/* sender */
+	uint32_t to_nodeid;     /* recipient, 0 for all */
+	uint32_t global_id;     /* global unique id for this lockspace */
+	uint32_t flags;		/* DLM_MFLG_ */
+	uint32_t msgdata;       /* in-header payload depends on MSG type; lkid
+				   for deadlock, seq for lockspace membership */
+	uint32_t pad1;
+	uint64_t pad2;
+};
 
 struct lockspace {
 	struct list_head	list;
 	char			name[MAXNAME+1];
 	uint32_t		global_id;
-	int			low_nodeid;
+
+	/* lockspace membership stuff */
+
+	cpg_handle_t		cpg_handle;
+	int			cpg_client;
+	int			cpg_fd;
 	int			joining;
-	int			cpg_ci;
-	cpg_handle_t		cpg_h;
-	SaCkptCheckpointHandleT lock_ckpt_handle;
+	int			leaving;
+	int			kernel_stopped;
+	uint32_t		change_seq;
+	struct change		*started_change;
+	struct list_head	changes;
+	struct list_head	node_history;
+
+	/* plock stuff */
+
+	int			plock_ckpt_node;
+	int			need_plocks;
+	int			save_plocks;
+	uint32_t		associated_mg_id;
+	struct list_head	saved_messages;
+	struct list_head	plock_resources;
+	time_t			last_checkpoint_time;
+	time_t			last_plock_time;
+	struct timeval		drop_resources_last;
+	uint64_t		plock_ckpt_handle;
+
+	/* deadlock stuff */
+
+	int			deadlk_low_nodeid;
+	struct list_head	deadlk_nodes;
+	uint64_t		deadlk_ckpt_handle;
+	int			deadlk_confchg_init;
 	struct list_head	transactions;
 	struct list_head	resources;
-	struct list_head	nodes;
 	struct timeval		cycle_start_time;
 	struct timeval		cycle_end_time;
 	struct timeval		last_send_cycle_start;
-	int			got_first_confchg;
 	int			cycle_running;
 	int			all_checkpoints_ready;
 };
 
 /* action.c */
-int set_control(char *name, int val);
-int set_event_done(char *name, int val);
+void set_associated_id(uint32_t mg_id);
+int set_sysfs_control(char *name, int val);
+int set_sysfs_event_done(char *name, int val);
+int set_sysfs_id(char *name, uint32_t id);
+int set_configfs_members(char *name, int new_count, int *new_members,
+			 int renew_count, int *renew_members);
+void clear_configfs(void);
 int add_configfs_node(int nodeid, char *addr, int addrlen, int local);
 void del_configfs_node(int nodeid);
-void clear_configfs(void);
-int set_members(char *name, int new_count, int *new_members);
-int set_id(char *name, uint32_t id);
-void set_ccs_options(void);
-int do_read(int fd, void *buf, size_t count);
-int do_write(int fd, void *buf, size_t count);
+int set_configfs_protocol(int proto);
+int set_configfs_timewarn(int cs);
+int set_configfs_debug(int val);
 
-/* member_xxx.c */
-int setup_member(void);
-void process_member(int ci);
-char *nodeid2name(int nodeid);
+/* cpg.c */
+int setup_cpg(void);
+void process_lockspace_changes(void);
+void dlm_send_message(struct lockspace *ls, char *buf, int len);
+int dlm_join_lockspace(struct lockspace *ls);
+int dlm_leave_lockspace(struct lockspace *ls);
+char *msg_name(int type);
+void update_flow_control_status(void);
 
-/* group.c */
-int setup_groupd(void);
-void process_groupd(int ci);
+/* deadlock.c */
+void setup_deadlock(void);
+void send_cycle_start(struct lockspace *ls);
+void receive_checkpoint_ready(struct lockspace *ls, struct dlm_header *hd,
+			      int len);
+void receive_cycle_start(struct lockspace *ls, struct dlm_header *hd, int len);
+void receive_cycle_end(struct lockspace *ls, struct dlm_header *hd, int len);
+void receive_cancel_lock(struct lockspace *ls, struct dlm_header *hd, int len);
 
 /* main.c */
-int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci));
+int do_read(int fd, void *buf, size_t count);
+int do_write(int fd, void *buf, size_t count);
 void client_dead(int ci);
-void set_client_lockspace(int ci, struct lockspace *ls);
-struct lockspace *get_client_lockspace(int ci);
-struct lockspace *create_ls(char *name);
+int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci));
+int client_fd(int ci);
+void client_ignore(int ci, int fd);
+void client_back(int ci, int fd);
 struct lockspace *find_ls(char *name);
+struct lockspace *find_ls_id(uint32_t id);
 char *dlm_mode_str(int mode);
 
 /* member_cman.c */
-int is_cman_member(int nodeid);
+int setup_cman(void);
+void process_cman(int ci);
 void cman_statechange(void);
+int is_cman_member(int nodeid);
+char *nodeid2name(int nodeid);
 
-/* deadlock.c */
-void setup_deadlock(void);
-void join_deadlock_cpg(struct lockspace *ls);
-void leave_deadlock_cpg(struct lockspace *ls);
-void send_cycle_start(struct lockspace *ls);
+/* netlink.c */
+int setup_netlink(void);
+void process_netlink(int ci);
+
+/* plock.c */
+int setup_plocks(void);
+void process_plocks(int ci);
+int limit_plocks(void);
+void receive_plock(struct lockspace *ls, struct dlm_header *hd, int len);
+void receive_own(struct lockspace *ls, struct dlm_header *hd, int len);
+void receive_sync(struct lockspace *ls, struct dlm_header *hd, int len);
+void receive_drop(struct lockspace *ls, struct dlm_header *hd, int len);
+void process_saved_plocks(struct lockspace *ls);
+void close_plock_checkpoint(struct lockspace *ls);
+void store_plocks(struct lockspace *ls);
+void retrieve_plocks(struct lockspace *ls);
+void purge_plocks(struct lockspace *ls, int nodeid, int unmount);
+int dump_plocks(char *name, int fd);
+
+/* group.c */
+int setup_groupd(void);
+void process_groupd(int ci);
+int dlm_join_lockspace_group(struct lockspace *ls);
+int dlm_leave_lockspace_group(struct lockspace *ls);
 
 #endif
 
diff --git a/group/dlm_controld/group.c b/group/dlm_controld/group.c
index 2e5b4e6..8dc68a1 100644
--- a/group/dlm_controld/group.c
+++ b/group/dlm_controld/group.c
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -11,6 +11,7 @@
 ******************************************************************************/
 
 #include "dlm_daemon.h"
+#include "libgroup.h"
 
 #define DO_STOP 1
 #define DO_START 2
@@ -84,7 +85,7 @@ group_callbacks_t callbacks = {
 	setid_cbfn
 };
 
-char *str_members(void)
+static char *str_members(void)
 {
 	static char str_members_buf[MAXLINE];
 	int i, ret, pos = 0, len = MAXLINE;
@@ -127,7 +128,7 @@ void process_groupd(int ci)
 	switch (cb_action) {
 	case DO_STOP:
 		log_debug("groupd callback: stop %s", cb_name);
-		set_control(cb_name, 0);
+		set_sysfs_control(cb_name, 0);
 		group_stop_done(gh, cb_name);
 		break;
 
@@ -135,12 +136,13 @@ void process_groupd(int ci)
 		log_debug("groupd callback: start %s count %d members %s",
 			  cb_name, cb_member_count, str_members());
 
-		set_members(cb_name, cb_member_count, cb_members);
+		set_configfs_members(cb_name, cb_member_count, cb_members,
+				     0, NULL);
 
 		/* this causes the dlm to do a "start" using the
 		   members we just set */
 
-		set_control(cb_name, 1);
+		set_sysfs_control(cb_name, 1);
 
 		/* the dlm doesn't need/use a "finish" stage following
 		   start, so we can just do start_done immediately */
@@ -155,14 +157,13 @@ void process_groupd(int ci)
 
 		/* this causes the dlm_new_lockspace() call (typically from
 		   mount) to complete */
-		set_event_done(cb_name, 0);
+		set_sysfs_event_done(cb_name, 0);
 
-		join_deadlock_cpg(ls);
 		break;
 
 	case DO_SETID:
 		log_debug("groupd callback: set_id %s %x", cb_name, cb_id);
-		set_id(cb_name, cb_id);
+		set_sysfs_id(cb_name, cb_id);
 		ls->global_id = cb_id;
 		break;
 
@@ -178,11 +179,10 @@ void process_groupd(int ci)
 			log_debug("leave event done %s", cb_name);
 
 			/* remove everything under configfs */
-			set_members(cb_name, 0, NULL);
+			set_configfs_members(ls->name, 0, NULL, 0, NULL);
 		}
 
-		set_event_done(cb_name, val);
-		leave_deadlock_cpg(ls);
+		set_sysfs_event_done(cb_name, val);
 		list_del(&ls->list);
 		free(ls);
 		break;
@@ -200,6 +200,29 @@ void process_groupd(int ci)
 	return;
 }
 
+int dlm_join_lockspace_group(struct lockspace *ls)
+{
+	int rv;
+
+	ls->joining = 1;
+	list_add(&ls->list, &lockspaces);
+
+	rv = group_join(gh, ls->name);
+	if (rv) {
+		list_del(&ls->list);
+		free(ls);
+	}
+
+	return rv;
+}
+
+int dlm_leave_lockspace_group(struct lockspace *ls)
+{
+	ls->leaving = 1;
+	group_leave(gh, ls->name);
+	return 0;
+}
+
 int setup_groupd(void)
 {
 	int rv;
diff --git a/group/dlm_controld/main.c b/group/dlm_controld/main.c
index a83eea0..0e4bc15 100644
--- a/group/dlm_controld/main.c
+++ b/group/dlm_controld/main.c
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -11,29 +11,22 @@
 ******************************************************************************/
 
 #include "dlm_daemon.h"
-
+#include "config.h"
+#include <linux/dlm.h>
 #include <linux/netlink.h>
 #include <linux/genetlink.h>
-#include <linux/dlm.h>
 #include <linux/dlm_netlink.h>
 
-#define OPTION_STRING			"KDhVd:"
-#define LOCKFILE_NAME			"/var/run/dlm_controld.pid"
-
-#define DEADLOCK_CHECK_SECS		10
-
-#define NALLOC 16
-
-struct list_head lockspaces;
-
-extern group_handle_t gh;
-extern int deadlock_enabled;
+#define LOCKFILE_NAME	"/var/run/dlm_controld.pid"
+#define CLIENT_NALLOC	32
+#define GROUP_LIBGROUP	2
+#define GROUP_LIBCPG	3
 
-static int daemon_quit;
 static int client_maxi;
 static int client_size = 0;
 static struct client *client = NULL;
 static struct pollfd *pollfd = NULL;
+static int group_mode;
 
 struct client {
 	int fd;
@@ -42,17 +35,72 @@ struct client {
 	struct lockspace *ls;
 };
 
+int do_read(int fd, void *buf, size_t count)
+{
+	int rv, off = 0;
+
+	while (off < count) {
+		rv = read(fd, buf + off, count - off);
+		if (rv == 0)
+			return -1;
+		if (rv == -1 && errno == EINTR)
+			continue;
+		if (rv == -1)
+			return -1;
+		off += rv;
+	}
+	return 0;
+}
+
+int do_write(int fd, void *buf, size_t count)
+{
+	int rv, off = 0;
+
+ retry:
+	rv = write(fd, buf + off, count);
+	if (rv == -1 && errno == EINTR)
+		goto retry;
+	if (rv < 0) {
+		log_error("write errno %d", errno);
+		return rv;
+	}
+
+	if (rv != count) {
+		count -= rv;
+		off += rv;
+		goto retry;
+	}
+	return 0;
+}
+
+static void do_dump(int fd)
+{
+	int len;
+
+	if (dump_wrap) {
+		len = DUMP_SIZE - dump_point;
+		do_write(fd, dump_buf + dump_point, len);
+		len = dump_point;
+	} else
+		len = dump_point;
+
+	/* NUL terminate the debug string */
+	dump_buf[dump_point] = '\0';
+
+	do_write(fd, dump_buf, len);
+}
+
 static void client_alloc(void)
 {
 	int i;
 
 	if (!client) {
-		client = malloc(NALLOC * sizeof(struct client));
-		pollfd = malloc(NALLOC * sizeof(struct pollfd));
+		client = malloc(CLIENT_NALLOC * sizeof(struct client));
+		pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
 	} else {
-		client = realloc(client, (client_size + NALLOC) *
+		client = realloc(client, (client_size + CLIENT_NALLOC) *
 					 sizeof(struct client));
-		pollfd = realloc(pollfd, (client_size + NALLOC) *
+		pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) *
 					 sizeof(struct pollfd));
 		if (!pollfd)
 			log_error("can't alloc for pollfd");
@@ -60,14 +108,14 @@ static void client_alloc(void)
 	if (!client || !pollfd)
 		log_error("can't alloc for client array");
 
-	for (i = client_size; i < client_size + NALLOC; i++) {
+	for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
 		client[i].workfn = NULL;
 		client[i].deadfn = NULL;
 		client[i].fd = -1;
 		pollfd[i].fd = -1;
 		pollfd[i].revents = 0;
 	}
-	client_size += NALLOC;
+	client_size += CLIENT_NALLOC;
 }
 
 void client_dead(int ci)
@@ -105,14 +153,21 @@ int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
 	goto again;
 }
 
-void set_client_lockspace(int ci, struct lockspace *ls)
+int client_fd(int ci)
 {
-	client[ci].ls = ls;
+	return client[ci].fd;
 }
 
-struct lockspace *get_client_lockspace(int ci)
+void client_ignore(int ci, int fd)
 {
-	return client[ci].ls;
+	pollfd[ci].fd = -1;
+	pollfd[ci].events = 0;
+}
+
+void client_back(int ci, int fd)
+{
+	pollfd[ci].fd = fd;
+	pollfd[ci].events = POLLIN;
 }
 
 static void sigterm_handler(int sig)
@@ -120,7 +175,7 @@ static void sigterm_handler(int sig)
 	daemon_quit = 1;
 }
 
-struct lockspace *create_ls(char *name)
+static struct lockspace *create_ls(char *name)
 {
 	struct lockspace *ls;
 
@@ -129,9 +184,14 @@ struct lockspace *create_ls(char *name)
 		goto out;
 	memset(ls, 0, sizeof(*ls));
 	strncpy(ls->name, name, MAXNAME);
+
+	INIT_LIST_HEAD(&ls->changes);
+	INIT_LIST_HEAD(&ls->node_history);
+	INIT_LIST_HEAD(&ls->saved_messages);
+	INIT_LIST_HEAD(&ls->plock_resources);
+	INIT_LIST_HEAD(&ls->deadlk_nodes);
 	INIT_LIST_HEAD(&ls->transactions);
 	INIT_LIST_HEAD(&ls->resources);
-	INIT_LIST_HEAD(&ls->nodes);
  out:
 	return ls;
 }
@@ -231,7 +291,7 @@ static void process_uevent(int ci)
 		return;
 	if (rv < 0) {
 		log_error("uevent recv error %d errno %d", rv, errno);
-		goto out;
+		return;
 	}
 
 	if (!strstr(buf, "dlm"))
@@ -250,6 +310,8 @@ static void process_uevent(int ci)
 
 	log_debug("kernel: %s %s", act, argv[3]);
 
+	rv = 0;
+
 	if (!strcmp(act, "online@")) {
 		ls = find_ls(argv[3]);
 		if (ls) {
@@ -263,10 +325,14 @@ static void process_uevent(int ci)
 			goto out;
 		}
 
-		ls->joining = 1;
-		list_add(&ls->list, &lockspaces);
-
-		rv = group_join(gh, argv[3]);
+		if (group_mode == GROUP_LIBGROUP)
+			rv = dlm_join_lockspace_group(ls);
+		else
+			rv = dlm_join_lockspace(ls);
+		if (rv) {
+			/* ls already freed */
+			goto out;
+		}
 
 	} else if (!strcmp(act, "offline@")) {
 		ls = find_ls(argv[3]);
@@ -275,9 +341,11 @@ static void process_uevent(int ci)
 			goto out;
 		}
 
-		rv = group_leave(gh, argv[3]);
-	} else
-		rv = 0;
+		if (group_mode == GROUP_LIBGROUP)
+			dlm_leave_lockspace_group(ls);
+		else
+			dlm_leave_lockspace(ls);
+	}
  out:
 	if (rv < 0)
 		log_error("process_uevent %s error %d errno %d",
@@ -310,216 +378,7 @@ static int setup_uevent(void)
 	return s;
 }
 
-/* FIXME: look into using libnl/libnetlink */
-
-#define GENLMSG_DATA(glh)       ((void *)(NLMSG_DATA(glh) + GENL_HDRLEN))
-#define GENLMSG_PAYLOAD(glh)    (NLMSG_PAYLOAD(glh, 0) - GENL_HDRLEN)
-#define NLA_DATA(na)	    	((void *)((char*)(na) + NLA_HDRLEN))
-#define NLA_PAYLOAD(len)	(len - NLA_HDRLEN)
-
-/* Maximum size of response requested or message sent */
-#define MAX_MSG_SIZE    1024
-
-struct msgtemplate {
-	struct nlmsghdr n;
-	struct genlmsghdr g;
-	char buf[MAX_MSG_SIZE];
-};
-
-static int send_genetlink_cmd(int sd, uint16_t nlmsg_type, uint32_t nlmsg_pid,
-			      uint8_t genl_cmd, uint16_t nla_type,
-			      void *nla_data, int nla_len)
-{
-	struct nlattr *na;
-	struct sockaddr_nl nladdr;
-	int r, buflen;
-	char *buf;
-
-	struct msgtemplate msg;
-
-	msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN);
-	msg.n.nlmsg_type = nlmsg_type;
-	msg.n.nlmsg_flags = NLM_F_REQUEST;
-	msg.n.nlmsg_seq = 0;
-	msg.n.nlmsg_pid = nlmsg_pid;
-	msg.g.cmd = genl_cmd;
-	msg.g.version = 0x1;
-	na = (struct nlattr *) GENLMSG_DATA(&msg);
-	na->nla_type = nla_type;
-	na->nla_len = nla_len + 1 + NLA_HDRLEN;
-	if (nla_data)
-		memcpy(NLA_DATA(na), nla_data, nla_len);
-	msg.n.nlmsg_len += NLMSG_ALIGN(na->nla_len);
-
-	buf = (char *) &msg;
-	buflen = msg.n.nlmsg_len ;
-	memset(&nladdr, 0, sizeof(nladdr));
-	nladdr.nl_family = AF_NETLINK;
-	while ((r = sendto(sd, buf, buflen, 0, (struct sockaddr *) &nladdr,
-			   sizeof(nladdr))) < buflen) {
-		if (r > 0) {
-			buf += r;
-			buflen -= r;
-		} else if (errno != EAGAIN)
-			return -1;
-	}
-	return 0;
-}
-
-/*
- * Probe the controller in genetlink to find the family id
- * for the DLM family
- */
-static int get_family_id(int sd)
-{
-	char genl_name[100];
-	struct {
-		struct nlmsghdr n;
-		struct genlmsghdr g;
-		char buf[256];
-	} ans;
-
-	int id, rc;
-	struct nlattr *na;
-	int rep_len;
-
-	strcpy(genl_name, DLM_GENL_NAME);
-	rc = send_genetlink_cmd(sd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY,
-				CTRL_ATTR_FAMILY_NAME, (void *)genl_name,
-				strlen(DLM_GENL_NAME)+1);
-
-	rep_len = recv(sd, &ans, sizeof(ans), 0);
-	if (ans.n.nlmsg_type == NLMSG_ERROR ||
-	    (rep_len < 0) || !NLMSG_OK((&ans.n), rep_len))
-		return 0;
-
-	na = (struct nlattr *) GENLMSG_DATA(&ans);
-	na = (struct nlattr *) ((char *) na + NLA_ALIGN(na->nla_len));
-	if (na->nla_type == CTRL_ATTR_FAMILY_ID) {
-		id = *(uint16_t *) NLA_DATA(na);
-	}
-	return id;
-}
-
-/* genetlink messages are timewarnings used as part of deadlock detection */
-
-static int setup_netlink(void)
-{
-	struct sockaddr_nl snl;
-	int s, rv;
-	uint16_t id;
-
-	s = socket(AF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
-	if (s < 0) {
-		log_error("generic netlink socket");
-		return s;
-	}
-
-	memset(&snl, 0, sizeof(snl));
-	snl.nl_family = AF_NETLINK;
-
-	rv = bind(s, (struct sockaddr *) &snl, sizeof(snl));
-	if (rv < 0) {
-		log_error("gen netlink bind error %d errno %d", rv, errno);
-		close(s);
-		return rv;
-	}
-
-	id = get_family_id(s);
-	if (!id) {
-		log_error("Error getting family id, errno %d", errno);
-		close(s);
-		return -1;
-	}
-
-	rv = send_genetlink_cmd(s, id, getpid(), DLM_CMD_HELLO, 0, NULL, 0);
-	if (rv < 0) {
-		log_error("error sending hello cmd, errno %d", errno);
-		close(s);
-		return -1;
-	}
-
-	return s;
-}
-
-static void process_timewarn(struct dlm_lock_data *data)
-{
-	struct lockspace *ls;
-	struct timeval now;
-	unsigned int sec;
-
-	ls = find_ls_id(data->lockspace_id);
-	if (!ls)
-		return;
-
-	data->resource_name[data->resource_namelen] = '\0';
-
-	log_group(ls, "timewarn: lkid %x pid %d name %s",
-		  data->id, data->ownpid, data->resource_name);
-
-	/* Problem: we don't want to get a timewarn, assume it's resolved
-	   by the current cycle, but in fact it's from a deadlock that
-	   formed after the checkpoints for the current cycle.  Then we'd
-	   have to hope for another warning (that may not come) to trigger
-	   a new cycle to catch the deadlock.  If our last cycle ckpt
-	   was say N (~5?) sec before we receive the timewarn, then we
-	   can be confident that the cycle included the lock in question.
-	   Otherwise, we're not sure if the warning is for a new deadlock
-	   that's formed since our last cycle ckpt (unless it's a long
-	   enough time since the last cycle that we're confident it *is*
-	   a new deadlock).  When there is a deadlock, I suspect it will
-	   be common to receive warnings before, during, and possibly
-	   after the cycle that resolves it.  Wonder if we should record
-	   timewarns and match them with deadlock cycles so we can tell
-	   which timewarns are addressed by a given cycle and which aren't.  */
-
-
-	gettimeofday(&now, NULL);
-
-	/* don't send a new start until at least SECS after the last
-	   we sent, and at least SECS after the last completed cycle */
-
-	sec = now.tv_sec - ls->last_send_cycle_start.tv_sec;
-
-	if (sec < DEADLOCK_CHECK_SECS) {
-		log_group(ls, "skip send: recent send cycle %d sec", sec);
-		return;
-	}
-
-	sec = now.tv_sec - ls->cycle_end_time.tv_sec;
-
-	if (sec < DEADLOCK_CHECK_SECS) {
-		log_group(ls, "skip send: recent cycle end %d sec", sec);
-		return;
-	}
-
-	gettimeofday(&ls->last_send_cycle_start, NULL);
-	send_cycle_start(ls);
-}
-
-static void process_netlink(int ci)
-{
-	struct msgtemplate msg;
-	struct nlattr *na;
-	int len;
-
-	len = recv(client[ci].fd, &msg, sizeof(msg), 0);
-
-	if (len < 0) {
-		log_error("nonfatal netlink error: errno %d", errno);
-		return;
-	}
-
-	if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), len)) {
-		struct nlmsgerr *err = NLMSG_DATA(&msg);
-		log_error("fatal netlink error: errno %d", err->error);
-		return;
-	}
-
-	na = (struct nlattr *) GENLMSG_DATA(&msg);
-
-	process_timewarn((struct dlm_lock_data *) NLA_DATA(na));
-}
+/* FIXME: use a library?  Add ability to list lockspaces and their state. */
 
 static void process_connection(int ci)
 {
@@ -548,6 +407,12 @@ static void process_connection(int ci)
 			send_cycle_start(ls);
 		else
 			log_debug("deadlock_check ls name not found");
+	} else if (!strncmp(argv[0], "dump", 4)) {
+		do_dump(client[ci].fd);
+		client_dead(ci);
+	} else if (!strncmp(argv[0], "plocks", 6)) {
+		dump_plocks(argv[1], client[ci].fd);
+		client_dead(ci);
 	}
 }
 
@@ -601,7 +466,7 @@ static int setup_listener(void)
 	return s;
 }
 
-void cluster_dead(int ci)
+static void cluster_dead(int ci)
 {
 	log_error("cluster is down, exiting");
 	clear_configfs();
@@ -610,6 +475,7 @@ void cluster_dead(int ci)
 
 static int loop(void)
 {
+	int poll_timeout = -1;
 	int rv, i;
 	void (*workfn) (int ci);
 	void (*deadfn) (int ci);
@@ -619,34 +485,76 @@ static int loop(void)
 		goto out;
 	client_add(rv, process_listener, NULL);
 
-	rv = setup_groupd();
-	if (rv < 0)
-		goto out;
-	client_add(rv, process_groupd, cluster_dead);
-
 	rv = setup_uevent();
 	if (rv < 0)
 		goto out;
 	client_add(rv, process_uevent, NULL);
 
-	rv = setup_member();
+	rv = setup_cman();
 	if (rv < 0)
 		goto out;
-	client_add(rv, process_member, cluster_dead);
+	client_add(rv, process_cman, cluster_dead);
 
-	/* netlink stuff is only used for deadlock detection */
-	if (!deadlock_enabled)
-		goto for_loop;
+	group_mode = GROUP_LIBCPG;
 
-	rv = setup_netlink();
-	if (rv < 0)
-		goto for_loop;
-	client_add(rv, process_netlink, NULL);
+	if (cfgd_groupd_compat) {
+		rv = setup_groupd();
+		if (rv < 0)
+			goto out;
+		client_add(rv, process_groupd, cluster_dead);
+
+		group_mode = GROUP_LIBGROUP;
+
+		if (cfgd_groupd_compat == 2) {
+			/* cfgd_groupd_compat of 2 uses new groupd feature that
+			   figures out whether all other groupd's in the cluster
+			   are in LIGCPG mode, and if they are group_mode is
+			   changed to LIBCPG.  If any groupd in the cluster
+			   is from cluster2/stable2/rhel5, or any groupd is
+			   in LIBGROUP mode, then group_mode remains LIBGROUP.
+
+			   set_group_mode() figures this out by joining the
+			   groupd cpg, sending a new "mode" message, and
+			   waiting to see if it gets a mode reply from
+			   all other groupd's.  If it does, and all modes
+			   are LIGCPG, then we set groupd_mode to LIBCPG.
+			   Any previous generation groupd's won't recognize
+			   the new mode message and won't reply; their lack
+			   of reply (or seeing an old-style message from them)
+			   indicates they are a cluster2 version.
+
+			   Not yet implemented.  In the future, it may set
+			   group_mode to GROUP_LIBCPG. */
+
+			/* set_group_mode(); */
+			group_mode = GROUP_LIBGROUP;
+		}
+	}
+
+	if (group_mode == GROUP_LIBCPG) {
+		rv = setup_cpg();
+		if (rv < 0)
+			goto out;
+		/* client_add(rv, process_cpg, cluster_dead); */
 
- for_loop:
+		if (cfgd_enable_deadlk) {
+			rv = setup_netlink();
+			if (rv < 0)
+				goto out;
+			client_add(rv, process_netlink, NULL);
+
+			setup_deadlock();
+		}
+
+		rv = setup_plocks();
+		if (rv < 0)
+			goto out;
+		plock_fd = rv;
+		plock_ci = client_add(rv, process_plocks, NULL);
+	}
 
 	for (;;) {
-		rv = poll(pollfd, client_maxi + 1, -1);
+		rv = poll(pollfd, client_maxi + 1, poll_timeout);
 		if (rv == -1 && errno == EINTR) {
 			if (daemon_quit && list_empty(&lockspaces)) {
 				clear_configfs();
@@ -672,6 +580,21 @@ static int loop(void)
 				deadfn(i);
 			}
 		}
+
+		poll_timeout = -1;
+
+		if (poll_fencing || poll_quorum || poll_fs) {
+			process_lockspace_changes();
+			poll_timeout = 1000;
+		}
+
+		if (poll_ignore_plock) {
+			if (!limit_plocks()) {
+				poll_ignore_plock = 0;
+				client_back(plock_ci, plock_fd);
+			}
+			poll_timeout = 1000;
+		}
 	}
 	rv = 0;
  out:
@@ -721,7 +644,7 @@ static void lockfile(void)
 	}
 }
 
-void daemonize(void)
+static void daemonize(void)
 {
 	pid_t pid = fork();
 	if (pid < 0) {
@@ -745,22 +668,46 @@ static void print_usage(void)
 {
 	printf("Usage:\n");
 	printf("\n");
-	printf("%s [options]\n", prog_name);
+	printf("dlm_controld [options]\n");
 	printf("\n");
 	printf("Options:\n");
 	printf("\n");
-	printf("  -d <num>     Enable (1) or disable (0, default) deadlock code\n");     
-	printf("  -D	       Enable debugging code and don't fork\n");
-	printf("  -K	       Enable kernel dlm debugging messages\n");
-	printf("  -h	       Print this help, then exit\n");
-	printf("  -V	       Print program version information, then exit\n");
+	printf("  -D		Enable daemon debugging and don't fork\n");
+	printf("  -K		Enable kernel dlm debugging messages\n");
+	printf("  -g <num>	groupd compatibility, 0 off, 1 on\n");
+	printf("		on: use libgroup, compat with cluster2/stable2/rhel5\n");
+	printf("		off: use libcpg, no backward compatability\n");
+	printf("		Default is %d\n", DEFAULT_GROUPD_COMPAT);
+	printf("  -d <num>	Enable (1) or disable (0) deadlock code\n");
+	printf("		Default is %d\n", DEFAULT_ENABLE_DEADLK);
+	printf("  -p <num>	Enable (1) or disable (0) plock code\n");
+	printf("		Default is %d\n", DEFAULT_ENABLE_PLOCK);
+	printf("  -P		Enable plock debugging\n");
+	printf("  -l <limit>	Limit the rate of plock operations\n");
+	printf("		Default is %d, set to 0 for no limit\n", DEFAULT_PLOCK_RATE_LIMIT);
+	printf("  -o <n>	plock ownership, 1 enable, 0 disable\n");
+	printf("		Default is %d\n", DEFAULT_PLOCK_OWNERSHIP);
+	printf("  -t <ms>	plock drop resources time (milliseconds)\n");
+	printf("		Default is %u\n", DEFAULT_DROP_RESOURCES_TIME);
+	printf("  -c <num>	plock drop resources count\n");
+	printf("		Default is %u\n", DEFAULT_DROP_RESOURCES_COUNT);
+	printf("  -a <ms>	plock drop resources age (milliseconds)\n");
+	printf("		Default is %u\n", DEFAULT_DROP_RESOURCES_AGE);
+	printf("  -h		Print this help, then exit\n");
+	printf("  -V		Print program version information, then exit\n");
 }
 
-static void decode_arguments(int argc, char **argv)
+#define OPTION_STRING			"DKg:d:p:Pl:o:t:c:a:hV"
+
+static void read_arguments(int argc, char **argv)
 {
 	int cont = 1;
 	int optchar;
 
+	/* we don't allow these to be set on command line, should we? */
+	optk_timewarn = 0;
+	optk_timewarn = 0;
+
 	while (cont) {
 		optchar = getopt(argc, argv, OPTION_STRING);
 
@@ -770,8 +717,54 @@ static void decode_arguments(int argc, char **argv)
 			daemon_debug_opt = 1;
 			break;
 
+		case 'g':
+			optd_groupd_compat = 1;
+			cfgd_groupd_compat = atoi(optarg);
+			break;
+
 		case 'K':
-			kernel_debug_opt = 1;
+			optk_debug = 1;
+			cfgk_debug = 1;
+			break;
+
+		case 'd':
+			optd_enable_deadlk = 1;
+			cfgd_enable_deadlk = atoi(optarg);
+			break;
+
+		case 'p':
+			optd_enable_plock = 1;
+			cfgd_enable_plock = atoi(optarg);
+			break;
+
+		case 'P':
+			optd_plock_debug = 1;
+			cfgd_plock_debug = 1;
+			break;
+
+		case 'l':
+			optd_plock_rate_limit = 1;
+			cfgd_plock_rate_limit = atoi(optarg);
+			break;
+
+		case 'o':
+			optd_plock_ownership = 1;
+			cfgd_plock_ownership = atoi(optarg);
+			break;
+
+		case 't':
+			optd_drop_resources_time = 1;
+			cfgd_drop_resources_time = atoi(optarg);
+			break;
+
+		case 'c':
+			optd_drop_resources_count = 1;
+			cfgd_drop_resources_count = atoi(optarg);
+			break;
+
+		case 'a':
+			optd_drop_resources_age = 1;
+			cfgd_drop_resources_age = atoi(optarg);
 			break;
 
 		case 'h':
@@ -779,10 +772,6 @@ static void decode_arguments(int argc, char **argv)
 			exit(EXIT_SUCCESS);
 			break;
 
-		case 'd':
-			deadlock_enabled = atoi(optarg);
-			break;
-
 		case 'V':
 			printf("dlm_controld (built %s %s)\n", __DATE__, __TIME__);
 			/* printf("%s\n", REDHAT_COPYRIGHT); */
@@ -807,7 +796,7 @@ static void decode_arguments(int argc, char **argv)
 	}
 }
 
-void set_oom_adj(int val)
+static void set_oom_adj(int val)
 {
 	FILE *fp;
 
@@ -819,7 +808,7 @@ void set_oom_adj(int val)
 	fclose(fp);
 }
 
-void set_scheduler(void)
+static void set_scheduler(void)
 {
 	struct sched_param sched_param;
 	int rv;
@@ -839,34 +828,62 @@ void set_scheduler(void)
 
 int main(int argc, char **argv)
 {
-	prog_name = argv[0];
-
 	INIT_LIST_HEAD(&lockspaces);
 
-	decode_arguments(argc, argv);
+	read_arguments(argc, argv);
 
 	if (!daemon_debug_opt)
 		daemonize();
+	signal(SIGTERM, sigterm_handler);
 
-	setup_deadlock();
+	read_ccs();
 
-	signal(SIGTERM, sigterm_handler);
+	clear_configfs();
+
+	/* the kernel has its own defaults for these values which we
+	   don't want to change unless these have been set; -1 means
+	   they have not been set on command line or config file */
+	if (cfgk_debug != -1)
+		set_configfs_debug(cfgk_debug);
+	if (cfgk_timewarn != -1)
+		set_configfs_timewarn(cfgk_timewarn);
+	if (cfgk_protocol != -1)
+		set_configfs_protocol(cfgk_protocol);
 
 	set_scheduler();
 	set_oom_adj(-16);
 
-	/* if this daemon was killed and the cluster shut down, and
-	   then the cluster brought back up and this daemon restarted,
-	   there will be old configfs entries we need to clear out */
-	clear_configfs();
+	return loop();
+}
 
-	set_ccs_options();
+void daemon_dump_save(void)
+{
+	int len, i;
 
-	return loop();
+	len = strlen(daemon_debug_buf);
+
+	for (i = 0; i < len; i++) {
+		dump_buf[dump_point++] = daemon_debug_buf[i];
+
+		if (dump_point == DUMP_SIZE) {
+			dump_point = 0;
+			dump_wrap = 1;
+		}
+	}
 }
 
-char *prog_name;
 int daemon_debug_opt;
+int daemon_quit;
+int poll_fencing;
+int poll_quorum;
+int poll_fs;
+int poll_ignore_plock;
+int plock_fd;
+int plock_ci;
+struct list_head lockspaces;
+int our_nodeid;
 char daemon_debug_buf[256];
-int kernel_debug_opt;
+char dump_buf[DUMP_SIZE];
+int dump_point;
+int dump_wrap;
 
diff --git a/group/dlm_controld/member_cman.c b/group/dlm_controld/member_cman.c
index 1ce180c..847351a 100644
--- a/group/dlm_controld/member_cman.c
+++ b/group/dlm_controld/member_cman.c
@@ -13,13 +13,11 @@
 #include <libcman.h>
 #include "dlm_daemon.h"
 
-int			our_nodeid;
 static cman_handle_t	ch;
 static cman_node_t      old_nodes[MAX_NODES];
 static int              old_node_count;
 static cman_node_t      cman_nodes[MAX_NODES];
 static int              cman_node_count;
-extern struct list_head lockspaces;
 
 static int is_member(cman_node_t *node_list, int count, int nodeid)
 {
@@ -147,7 +145,7 @@ static void member_callback(cman_handle_t h, void *private, int reason, int arg)
 	}
 }
 
-void process_member(int ci)
+void process_cman(int ci)
 {
 	int rv;
 
@@ -161,7 +159,7 @@ void process_member(int ci)
 	}
 }
 
-int setup_member(void)
+int setup_cman(void)
 {
 	cman_node_t node;
 	int rv, fd;
@@ -205,7 +203,8 @@ int setup_member(void)
 }
 
 /* Force re-read of cman nodes */
-void cman_statechange()
+void cman_statechange(void)
 {
 	statechange();
 }
+
diff --git a/group/dlm_controld/netlink.c b/group/dlm_controld/netlink.c
new file mode 100644
index 0000000..b61337e
--- /dev/null
+++ b/group/dlm_controld/netlink.c
@@ -0,0 +1,237 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2008 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_daemon.h"
+#include "config.h"
+#include <linux/dlm.h>
+#include <linux/netlink.h>
+#include <linux/genetlink.h>
+#include <linux/dlm_netlink.h>
+
+#define DEADLOCK_CHECK_SECS		10
+
+/* FIXME: look into using libnl/libnetlink */
+
+#define GENLMSG_DATA(glh)       ((void *)(NLMSG_DATA(glh) + GENL_HDRLEN))
+#define GENLMSG_PAYLOAD(glh)    (NLMSG_PAYLOAD(glh, 0) - GENL_HDRLEN)
+#define NLA_DATA(na)	    	((void *)((char*)(na) + NLA_HDRLEN))
+#define NLA_PAYLOAD(len)	(len - NLA_HDRLEN)
+
+/* Maximum size of response requested or message sent */
+#define MAX_MSG_SIZE    1024
+
+struct msgtemplate {
+	struct nlmsghdr n;
+	struct genlmsghdr g;
+	char buf[MAX_MSG_SIZE];
+};
+
+static int send_genetlink_cmd(int sd, uint16_t nlmsg_type, uint32_t nlmsg_pid,
+			      uint8_t genl_cmd, uint16_t nla_type,
+			      void *nla_data, int nla_len)
+{
+	struct nlattr *na;
+	struct sockaddr_nl nladdr;
+	int r, buflen;
+	char *buf;
+
+	struct msgtemplate msg;
+
+	msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN);
+	msg.n.nlmsg_type = nlmsg_type;
+	msg.n.nlmsg_flags = NLM_F_REQUEST;
+	msg.n.nlmsg_seq = 0;
+	msg.n.nlmsg_pid = nlmsg_pid;
+	msg.g.cmd = genl_cmd;
+	msg.g.version = 0x1;
+	na = (struct nlattr *) GENLMSG_DATA(&msg);
+	na->nla_type = nla_type;
+	na->nla_len = nla_len + 1 + NLA_HDRLEN;
+	if (nla_data)
+		memcpy(NLA_DATA(na), nla_data, nla_len);
+	msg.n.nlmsg_len += NLMSG_ALIGN(na->nla_len);
+
+	buf = (char *) &msg;
+	buflen = msg.n.nlmsg_len ;
+	memset(&nladdr, 0, sizeof(nladdr));
+	nladdr.nl_family = AF_NETLINK;
+	while ((r = sendto(sd, buf, buflen, 0, (struct sockaddr *) &nladdr,
+			   sizeof(nladdr))) < buflen) {
+		if (r > 0) {
+			buf += r;
+			buflen -= r;
+		} else if (errno != EAGAIN)
+			return -1;
+	}
+	return 0;
+}
+
+/*
+ * Probe the controller in genetlink to find the family id
+ * for the DLM family
+ */
+static int get_family_id(int sd)
+{
+	char genl_name[100];
+	struct {
+		struct nlmsghdr n;
+		struct genlmsghdr g;
+		char buf[256];
+	} ans;
+
+	int id = 0, rc;
+	struct nlattr *na;
+	int rep_len;
+
+	strcpy(genl_name, DLM_GENL_NAME);
+	rc = send_genetlink_cmd(sd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY,
+				CTRL_ATTR_FAMILY_NAME, (void *)genl_name,
+				strlen(DLM_GENL_NAME)+1);
+
+	rep_len = recv(sd, &ans, sizeof(ans), 0);
+	if (ans.n.nlmsg_type == NLMSG_ERROR ||
+	    (rep_len < 0) || !NLMSG_OK((&ans.n), rep_len))
+		return 0;
+
+	na = (struct nlattr *) GENLMSG_DATA(&ans);
+	na = (struct nlattr *) ((char *) na + NLA_ALIGN(na->nla_len));
+	if (na->nla_type == CTRL_ATTR_FAMILY_ID) {
+		id = *(uint16_t *) NLA_DATA(na);
+	}
+	return id;
+}
+
+/* genetlink messages are timewarnings used as part of deadlock detection */
+
+int setup_netlink(void)
+{
+	struct sockaddr_nl snl;
+	int s, rv;
+	uint16_t id;
+
+	s = socket(AF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
+	if (s < 0) {
+		log_error("generic netlink socket");
+		return s;
+	}
+
+	memset(&snl, 0, sizeof(snl));
+	snl.nl_family = AF_NETLINK;
+
+	rv = bind(s, (struct sockaddr *) &snl, sizeof(snl));
+	if (rv < 0) {
+		log_error("gen netlink bind error %d errno %d", rv, errno);
+		close(s);
+		return rv;
+	}
+
+	id = get_family_id(s);
+	if (!id) {
+		log_error("Error getting family id, errno %d", errno);
+		close(s);
+		return -1;
+	}
+
+	rv = send_genetlink_cmd(s, id, getpid(), DLM_CMD_HELLO, 0, NULL, 0);
+	if (rv < 0) {
+		log_error("error sending hello cmd, errno %d", errno);
+		close(s);
+		return -1;
+	}
+
+	return s;
+}
+
+static void process_timewarn(struct dlm_lock_data *data)
+{
+	struct lockspace *ls;
+	struct timeval now;
+	unsigned int sec;
+
+	ls = find_ls_id(data->lockspace_id);
+	if (!ls)
+		return;
+
+	data->resource_name[data->resource_namelen] = '\0';
+
+	log_group(ls, "timewarn: lkid %x pid %d name %s",
+		  data->id, data->ownpid, data->resource_name);
+
+	/* Problem: we don't want to get a timewarn, assume it's resolved
+	   by the current cycle, but in fact it's from a deadlock that
+	   formed after the checkpoints for the current cycle.  Then we'd
+	   have to hope for another warning (that may not come) to trigger
+	   a new cycle to catch the deadlock.  If our last cycle ckpt
+	   was say N (~5?) sec before we receive the timewarn, then we
+	   can be confident that the cycle included the lock in question.
+	   Otherwise, we're not sure if the warning is for a new deadlock
+	   that's formed since our last cycle ckpt (unless it's a long
+	   enough time since the last cycle that we're confident it *is*
+	   a new deadlock).  When there is a deadlock, I suspect it will
+	   be common to receive warnings before, during, and possibly
+	   after the cycle that resolves it.  Wonder if we should record
+	   timewarns and match them with deadlock cycles so we can tell
+	   which timewarns are addressed by a given cycle and which aren't.  */
+
+
+	gettimeofday(&now, NULL);
+
+	/* don't send a new start until at least SECS after the last
+	   we sent, and at least SECS after the last completed cycle */
+
+	sec = now.tv_sec - ls->last_send_cycle_start.tv_sec;
+
+	if (sec < DEADLOCK_CHECK_SECS) {
+		log_group(ls, "skip send: recent send cycle %d sec", sec);
+		return;
+	}
+
+	sec = now.tv_sec - ls->cycle_end_time.tv_sec;
+
+	if (sec < DEADLOCK_CHECK_SECS) {
+		log_group(ls, "skip send: recent cycle end %d sec", sec);
+		return;
+	}
+
+	gettimeofday(&ls->last_send_cycle_start, NULL);
+
+	if (cfgd_enable_deadlk)
+		send_cycle_start(ls);
+}
+
+void process_netlink(int ci)
+{
+	struct msgtemplate msg;
+	struct nlattr *na;
+	int len;
+	int fd;
+
+	fd = client_fd(ci);
+
+	len = recv(fd, &msg, sizeof(msg), 0);
+
+	if (len < 0) {
+		log_error("nonfatal netlink error: errno %d", errno);
+		return;
+	}
+
+	if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), len)) {
+		struct nlmsgerr *err = NLMSG_DATA(&msg);
+		log_error("fatal netlink error: errno %d", err->error);
+		return;
+	}
+
+	na = (struct nlattr *) GENLMSG_DATA(&msg);
+
+	process_timewarn((struct dlm_lock_data *) NLA_DATA(na));
+}
+
diff --git a/group/gfs_controld/plock.c b/group/dlm_controld/plock.c
similarity index 71%
copy from group/gfs_controld/plock.c
copy to group/dlm_controld/plock.c
index 42890df..a862356 100644
--- a/group/gfs_controld/plock.c
+++ b/group/dlm_controld/plock.c
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -10,52 +10,16 @@
 *******************************************************************************
 ******************************************************************************/
 
-#include <sys/types.h>
-#include <asm/types.h>
-#include <sys/uio.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <sys/ioctl.h>
-#include <sys/stat.h>
-#include <sys/utsname.h>
-#include <sys/time.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <net/if.h>
-#include <stdio.h>
-#include <errno.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stddef.h>
-#include <stdint.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <limits.h>
-#include <unistd.h>
-#include <openais/saAis.h>
-#include <openais/saCkpt.h>
-#include <linux/lock_dlm_plock.h>
-
-#include "lock_dlm.h"
+#include "dlm_daemon.h"
+#include "config.h"
+#include <linux/dlm_plock.h>
 
 #define PROC_MISC               "/proc/misc"
 #define PROC_DEVICES            "/proc/devices"
 #define MISC_NAME               "misc"
 #define CONTROL_DIR             "/dev/misc"
-#define CONTROL_NAME            "lock_dlm_plock"
+#define CONTROL_NAME            "dlm_plock"
 
-extern int our_nodeid;
-extern int message_flow_control_on;
-
-/* user configurable */
-extern int config_no_plock;
-extern uint32_t config_plock_rate_limit;
-extern uint32_t config_plock_ownership;
-extern uint32_t config_drop_resources_time;
-extern uint32_t config_drop_resources_count;
-extern uint32_t config_drop_resources_age;
-
-static int plocks_online = 0;
 static uint32_t plock_read_count;
 static uint32_t plock_recv_count;
 static uint32_t plock_rate_delays;
@@ -64,11 +28,14 @@ static struct timeval plock_recv_time;
 static struct timeval plock_rate_last;
 
 static int control_fd = -1;
-static SaCkptHandleT ckpt_handle;
+static SaCkptHandleT system_ckpt_handle;
 static SaCkptCallbacksT callbacks = { 0, 0 };
 static SaVersionT version = { 'B', 1, 1 };
 static char section_buf[1024 * 1024];
 static uint32_t section_len;
+static int need_fsid_translation = 0;
+
+extern int message_flow_control_on;
 
 struct pack_plock {
 	uint64_t start;
@@ -112,13 +79,21 @@ struct posix_lock {
 struct lock_waiter {
 	struct list_head	list;
 	uint32_t		flags;
-	struct gdlm_plock_info	info;
+	struct dlm_plock_info	info;
+};
+
+struct save_msg {
+	struct list_head list;
+	int nodeid;
+	int len;
+	int type;
+	char buf[0];
 };
 
 
-static void send_own(struct mountgroup *mg, struct resource *r, int owner);
-static void save_pending_plock(struct mountgroup *mg, struct resource *r,
-			       struct gdlm_plock_info *in);
+static void send_own(struct lockspace *ls, struct resource *r, int owner);
+static void save_pending_plock(struct lockspace *ls, struct resource *r,
+			       struct dlm_plock_info *in);
 
 
 static int got_unown(struct resource *r)
@@ -126,7 +101,7 @@ static int got_unown(struct resource *r)
 	return !!(r->flags & R_GOT_UNOWN);
 }
 
-static void info_bswap_out(struct gdlm_plock_info *i)
+static void info_bswap_out(struct dlm_plock_info *i)
 {
 	i->version[0]	= cpu_to_le32(i->version[0]);
 	i->version[1]	= cpu_to_le32(i->version[1]);
@@ -141,7 +116,7 @@ static void info_bswap_out(struct gdlm_plock_info *i)
 	i->owner	= cpu_to_le64(i->owner);
 }
 
-static void info_bswap_in(struct gdlm_plock_info *i)
+static void info_bswap_in(struct dlm_plock_info *i)
 {
 	i->version[0]	= le32_to_cpu(i->version[0]);
 	i->version[1]	= le32_to_cpu(i->version[1]);
@@ -159,11 +134,11 @@ static void info_bswap_in(struct gdlm_plock_info *i)
 static char *op_str(int optype)
 {
 	switch (optype) {
-	case GDLM_PLOCK_OP_LOCK:
+	case DLM_PLOCK_OP_LOCK:
 		return "LK";
-	case GDLM_PLOCK_OP_UNLOCK:
+	case DLM_PLOCK_OP_UNLOCK:
 		return "UN";
-	case GDLM_PLOCK_OP_GET:
+	case DLM_PLOCK_OP_GET:
 		return "GET";
 	default:
 		return "??";
@@ -172,7 +147,7 @@ static char *op_str(int optype)
 
 static char *ex_str(int optype, int ex)
 {
-	if (optype == GDLM_PLOCK_OP_UNLOCK || optype == GDLM_PLOCK_OP_GET)
+	if (optype == DLM_PLOCK_OP_UNLOCK || optype == DLM_PLOCK_OP_GET)
 		return "-";
 	if (ex)
 		return "WR";
@@ -207,10 +182,11 @@ static int get_proc_number(const char *file, const char *name, uint32_t *number)
 	return 0;
 }
 
-static int control_device_number(uint32_t *major, uint32_t *minor)
+static int control_device_number(const char *plock_misc_name,
+				 uint32_t *major, uint32_t *minor)
 {
 	if (!get_proc_number(PROC_DEVICES, MISC_NAME, major) ||
-	    !get_proc_number(PROC_MISC, GDLM_PLOCK_MISC_NAME, minor)) {
+	    !get_proc_number(PROC_MISC, plock_misc_name, minor)) {
 		*major = 0;
 		return 0;
 	}
@@ -277,7 +253,7 @@ static int create_control(const char *control, uint32_t major, uint32_t minor)
 	return 1;
 }
 
-static int open_control(void)
+static int open_control(const char *control_name, const char *plock_misc_name)
 {
 	char control[PATH_MAX];
 	uint32_t major = 0, minor = 0;
@@ -285,22 +261,20 @@ static int open_control(void)
 	if (control_fd != -1)
 		return 0;
 
-	snprintf(control, sizeof(control), "%s/%s", CONTROL_DIR, CONTROL_NAME);
+	snprintf(control, sizeof(control), "%s/%s", CONTROL_DIR, control_name);
 
-	if (!control_device_number(&major, &minor)) {
-		log_error("Is dlm missing from kernel?");
+	if (!control_device_number(plock_misc_name, &major, &minor))
 		return -1;
-	}
 
 	if (!control_exists(control, major, minor) &&
 	    !create_control(control, major, minor)) {
-		log_error("Failure to communicate with kernel lock_dlm");
+		log_error("Failure to create device file %s", control);
 		return -1;
 	}
 
 	control_fd = open(control, O_RDWR);
 	if (control_fd < 0) {
-		log_error("Failure to communicate with kernel lock_dlm: %s",
+		log_error("Failure to open device %s: %s", control,
 			  strerror(errno));
 		return -1;
 	}
@@ -308,6 +282,16 @@ static int open_control(void)
 	return 0;
 }
 
+/*
+ * In kernels before 2.6.26, plocks came from gfs2's lock_dlm module.
+ * Reading plocks from there as well should allow us to use cluster3
+ * on old (RHEL5) kernels.  In this case, the fsid we read in plock_info
+ * structs is the mountgroup id, which we need to translate to the ls id.
+ */
+
+#define OLD_CONTROL_NAME "lock_dlm_plock"
+#define OLD_PLOCK_MISC_NAME "lock_dlm_plock"
+
 int setup_plocks(void)
 {
 	SaAisErrorT err;
@@ -320,28 +304,55 @@ int setup_plocks(void)
 	gettimeofday(&plock_recv_time, NULL);
 	gettimeofday(&plock_rate_last, NULL);
 
-	if (config_no_plock)
-		goto control;
+	err = saCkptInitialize(&system_ckpt_handle, &callbacks, &version);
+	if (err != SA_AIS_OK) {
+		log_error("ckpt init error %d", err);
+		cfgd_enable_plock = 0;
 
-	err = saCkptInitialize(&ckpt_handle, &callbacks, &version);
-	if (err == SA_AIS_OK)
-		plocks_online = 1;
-	else
-		log_error("ckpt init error %d - plocks unavailable", err);
+		/* still try to open and read the control device so that we can
+		   send ENOSYS back to the kernel if it tries to do a plock */
+	}
 
- control:
-	rv = open_control();
-	if (rv)
-		return rv;
+
+	rv = open_control(CONTROL_NAME, DLM_PLOCK_MISC_NAME);
+	if (rv) {
+		log_debug("setup_plocks trying old lock_dlm interface");
+		rv = open_control(OLD_CONTROL_NAME, OLD_PLOCK_MISC_NAME);
+		if (rv) {
+			log_error("Is dlm missing from kernel?  No control device.");
+			return rv;
+		}
+		need_fsid_translation = 1;
+	}
 
 	log_debug("plocks %d", control_fd);
 	log_debug("plock cpg message size: %u bytes",
-		  (unsigned int) (sizeof(struct gdlm_header) +
-		                  sizeof(struct gdlm_plock_info)));
+		  (unsigned int) (sizeof(struct dlm_header) +
+		                  sizeof(struct dlm_plock_info)));
 
 	return control_fd;
 }
 
+static uint32_t mg_to_ls_id(uint32_t fsid)
+{
+	struct lockspace *ls;
+	int do_set = 1;
+
+ retry:
+	list_for_each_entry(ls, &lockspaces, list) {
+		if (ls->associated_mg_id == fsid)
+			return ls->global_id;
+	}
+
+	if (do_set) {
+		do_set = 0;
+		set_associated_id(fsid);
+		goto retry;
+	}
+
+	return fsid;
+}
+
 /* FIXME: unify these two */
 
 static unsigned long time_diff_ms(struct timeval *begin, struct timeval *end)
@@ -361,24 +372,24 @@ static uint64_t dt_usec(struct timeval *start, struct timeval *stop)
 	return dt;
 }
 
-static struct resource *search_resource(struct mountgroup *mg, uint64_t number)
+static struct resource *search_resource(struct lockspace *ls, uint64_t number)
 {
 	struct resource *r;
 
-	list_for_each_entry(r, &mg->resources, list) {
+	list_for_each_entry(r, &ls->plock_resources, list) {
 		if (r->number == number)
 			return r;
 	}
 	return NULL;
 }
 
-static int find_resource(struct mountgroup *mg, uint64_t number, int create,
+static int find_resource(struct lockspace *ls, uint64_t number, int create,
 			 struct resource **r_out)
 {
 	struct resource *r = NULL;
 	int rv = 0;
 
-	r = search_resource(mg, number);
+	r = search_resource(ls, number);
 	if (r)
 		goto out;
 
@@ -400,12 +411,12 @@ static int find_resource(struct mountgroup *mg, uint64_t number, int create,
 	INIT_LIST_HEAD(&r->waiters);
 	INIT_LIST_HEAD(&r->pending);
 
-	if (config_plock_ownership)
+	if (cfgd_plock_ownership)
 		r->owner = -1;
 	else
 		r->owner = 0;
 
-	list_add_tail(&r->list, &mg->resources);
+	list_add_tail(&r->list, &ls->plock_resources);
  out:
 	if (r)
 		gettimeofday(&r->last_access, NULL);
@@ -416,7 +427,7 @@ static int find_resource(struct mountgroup *mg, uint64_t number, int create,
 static void put_resource(struct resource *r)
 {
 	/* with ownership, resources are only freed via drop messages */
-	if (config_plock_ownership)
+	if (cfgd_plock_ownership)
 		return;
 
 	if (list_empty(&r->locks) && list_empty(&r->waiters)) {
@@ -429,8 +440,8 @@ static inline int ranges_overlap(uint64_t start1, uint64_t end1,
 				 uint64_t start2, uint64_t end2)
 {
 	if (end1 < start2 || start1 > end2)
-		return FALSE;
-	return TRUE;
+		return 0;
+	return 1;
 }
 
 /**
@@ -529,7 +540,7 @@ static int shrink_range(struct posix_lock *po, uint64_t start, uint64_t end)
 	return shrink_range2(&po->start, &po->end, start, end);
 }
 
-static int is_conflict(struct resource *r, struct gdlm_plock_info *in, int get)
+static int is_conflict(struct resource *r, struct dlm_plock_info *in, int get)
 {
 	struct posix_lock *po;
 
@@ -578,7 +589,7 @@ static int add_lock(struct resource *r, uint32_t nodeid, uint64_t owner,
    2. convert RE to RN range and mode */
 
 static int lock_case1(struct posix_lock *po, struct resource *r,
-		      struct gdlm_plock_info *in)
+		      struct dlm_plock_info *in)
 {
 	uint64_t start2, end2;
 	int rv;
@@ -605,7 +616,7 @@ static int lock_case1(struct posix_lock *po, struct resource *r,
    3. convert RE to RN range and mode */
 			 
 static int lock_case2(struct posix_lock *po, struct resource *r,
-		      struct gdlm_plock_info *in)
+		      struct dlm_plock_info *in)
 
 {
 	int rv;
@@ -627,8 +638,8 @@ static int lock_case2(struct posix_lock *po, struct resource *r,
 	return rv;
 }
 
-static int lock_internal(struct mountgroup *mg, struct resource *r,
-			 struct gdlm_plock_info *in)
+static int lock_internal(struct lockspace *ls, struct resource *r,
+			 struct dlm_plock_info *in)
 {
 	struct posix_lock *po, *safe;
 	int rv = 0;
@@ -690,8 +701,8 @@ static int lock_internal(struct mountgroup *mg, struct resource *r,
 
 }
 
-static int unlock_internal(struct mountgroup *mg, struct resource *r,
-			   struct gdlm_plock_info *in)
+static int unlock_internal(struct lockspace *ls, struct resource *r,
+			   struct dlm_plock_info *in)
 {
 	struct posix_lock *po, *safe;
 	int rv = 0;
@@ -754,8 +765,8 @@ static int unlock_internal(struct mountgroup *mg, struct resource *r,
 	return rv;
 }
 
-static int add_waiter(struct mountgroup *mg, struct resource *r,
-		      struct gdlm_plock_info *in)
+static int add_waiter(struct lockspace *ls, struct resource *r,
+		      struct dlm_plock_info *in)
 
 {
 	struct lock_waiter *w;
@@ -763,26 +774,29 @@ static int add_waiter(struct mountgroup *mg, struct resource *r,
 	w = malloc(sizeof(struct lock_waiter));
 	if (!w)
 		return -ENOMEM;
-	memcpy(&w->info, in, sizeof(struct gdlm_plock_info));
+	memcpy(&w->info, in, sizeof(struct dlm_plock_info));
 	list_add_tail(&w->list, &r->waiters);
 	return 0;
 }
 
-static void write_result(struct mountgroup *mg, struct gdlm_plock_info *in,
+static void write_result(struct lockspace *ls, struct dlm_plock_info *in,
 			 int rv)
 {
 	int err;
 
+	if (need_fsid_translation)
+		in->fsid = ls->associated_mg_id;
+
 	in->rv = rv;
-	err = write(control_fd, in, sizeof(struct gdlm_plock_info));
-	if (err != sizeof(struct gdlm_plock_info))
+	err = write(control_fd, in, sizeof(struct dlm_plock_info));
+	if (err != sizeof(struct dlm_plock_info))
 		log_error("plock result write err %d errno %d", err, errno);
 }
 
-static void do_waiters(struct mountgroup *mg, struct resource *r)
+static void do_waiters(struct lockspace *ls, struct resource *r)
 {
 	struct lock_waiter *w, *safe;
-	struct gdlm_plock_info *in;
+	struct dlm_plock_info *in;
 	int rv;
 
 	list_for_each_entry_safe(w, safe, &r->waiters, list) {
@@ -794,21 +808,21 @@ static void do_waiters(struct mountgroup *mg, struct resource *r)
 		list_del(&w->list);
 
 		/*
-		log_group(mg, "take waiter %llx %llx-%llx %d/%u/%llx",
+		log_group(ls, "take waiter %llx %llx-%llx %d/%u/%llx",
 			  in->number, in->start, in->end,
 			  in->nodeid, in->pid, in->owner);
 		*/
 
-		rv = lock_internal(mg, r, in);
+		rv = lock_internal(ls, r, in);
 
 		if (in->nodeid == our_nodeid)
-			write_result(mg, in, rv);
+			write_result(ls, in, rv);
 
 		free(w);
 	}
 }
 
-static void do_lock(struct mountgroup *mg, struct gdlm_plock_info *in,
+static void do_lock(struct lockspace *ls, struct dlm_plock_info *in,
 		    struct resource *r)
 {
 	int rv;
@@ -817,39 +831,39 @@ static void do_lock(struct mountgroup *mg, struct gdlm_plock_info *in,
 		if (!in->wait)
 			rv = -EAGAIN;
 		else {
-			rv = add_waiter(mg, r, in);
+			rv = add_waiter(ls, r, in);
 			if (rv)
 				goto out;
 			rv = -EINPROGRESS;
 		}
 	} else
-		rv = lock_internal(mg, r, in);
+		rv = lock_internal(ls, r, in);
 
  out:
 	if (in->nodeid == our_nodeid && rv != -EINPROGRESS)
-		write_result(mg, in, rv);
+		write_result(ls, in, rv);
 
-	do_waiters(mg, r);
+	do_waiters(ls, r);
 	put_resource(r);
 }
 
-static void do_unlock(struct mountgroup *mg, struct gdlm_plock_info *in,
+static void do_unlock(struct lockspace *ls, struct dlm_plock_info *in,
 		      struct resource *r)
 {
 	int rv;
 
-	rv = unlock_internal(mg, r, in);
+	rv = unlock_internal(ls, r, in);
 
 	if (in->nodeid == our_nodeid)
-		write_result(mg, in, rv);
+		write_result(ls, in, rv);
 
-	do_waiters(mg, r);
+	do_waiters(ls, r);
 	put_resource(r);
 }
 
 /* we don't even get to this function if the getlk isn't from us */
 
-static void do_get(struct mountgroup *mg, struct gdlm_plock_info *in,
+static void do_get(struct lockspace *ls, struct dlm_plock_info *in,
 		   struct resource *r)
 {
 	int rv;
@@ -859,32 +873,52 @@ static void do_get(struct mountgroup *mg, struct gdlm_plock_info *in,
 	else
 		rv = 0;
 
-	write_result(mg, in, rv);
+	write_result(ls, in, rv);
+}
+
+static void save_message(struct lockspace *ls, struct dlm_header *hd, int len,
+			 int from, int type)
+{
+	struct save_msg *sm;
+
+	sm = malloc(sizeof(struct save_msg) + len);
+	if (!sm)
+		return;
+	memset(sm, 0, sizeof(struct save_msg) + len);
+
+	memcpy(&sm->buf, hd, len);
+	sm->type = type;
+	sm->len = len;
+	sm->nodeid = from;
+
+	log_group(ls, "save %s from %d len %d", msg_name(type), from, len);
+
+	list_add_tail(&sm->list, &ls->saved_messages);
 }
 
-static void __receive_plock(struct mountgroup *mg, struct gdlm_plock_info *in,
+static void __receive_plock(struct lockspace *ls, struct dlm_plock_info *in,
 			    int from, struct resource *r)
 {
 	switch (in->optype) {
-	case GDLM_PLOCK_OP_LOCK:
-		mg->last_plock_time = time(NULL);
-		do_lock(mg, in, r);
+	case DLM_PLOCK_OP_LOCK:
+		ls->last_plock_time = time(NULL);
+		do_lock(ls, in, r);
 		break;
-	case GDLM_PLOCK_OP_UNLOCK:
-		mg->last_plock_time = time(NULL);
-		do_unlock(mg, in, r);
+	case DLM_PLOCK_OP_UNLOCK:
+		ls->last_plock_time = time(NULL);
+		do_unlock(ls, in, r);
 		break;
-	case GDLM_PLOCK_OP_GET:
-		do_get(mg, in, r);
+	case DLM_PLOCK_OP_GET:
+		do_get(ls, in, r);
 		break;
 	default:
 		log_error("receive_plock from %d optype %d", from, in->optype);
 		if (from == our_nodeid)
-			write_result(mg, in, -EINVAL);
+			write_result(ls, in, -EINVAL);
 	}
 }
 
-/* When mg members receive our options message (for our mount), one of them
+/* When ls members receive our options message (for our mount), one of them
    saves all plock state received to that point in a checkpoint and then sends
    us our journals message.  We know to retrieve the plock state from the
    checkpoint when we receive our journals message.  Any plocks messages that
@@ -894,19 +928,19 @@ static void __receive_plock(struct mountgroup *mg, struct gdlm_plock_info *in,
    set save_plocks (when we see our options message) can be ignored because it
    should be reflected in the checkpointed state. */
 
-static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
+static void _receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	struct gdlm_plock_info info;
-	struct gdlm_header *hd = (struct gdlm_header *) buf;
+	struct dlm_plock_info info;
 	struct resource *r = NULL;
 	struct timeval now;
 	uint64_t usec;
+	int from = hd->nodeid;
 	int rv, create;
 
-	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
+	memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
 	info_bswap_in(&info);
 
-	log_plock(mg, "receive plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
+	log_plock(ls, "receive plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
 		  (unsigned long long)info.number,
 		  op_str(info.optype),
 		  ex_str(info.optype, info.ex),
@@ -918,12 +952,12 @@ static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
 	if (!(plock_recv_count % 1000)) {
 		gettimeofday(&now, NULL);
 		usec = dt_usec(&plock_recv_time, &now);
-		log_group(mg, "plock_recv_count %u time %.3f s",
+		log_group(ls, "plock_recv_count %u time %.3f s",
 			  plock_recv_count, usec * 1.e-6);
 		plock_recv_time = now;
 	}
 
-	if (info.optype == GDLM_PLOCK_OP_GET && from != our_nodeid)
+	if (info.optype == DLM_PLOCK_OP_GET && from != our_nodeid)
 		return;
 
 	if (from != hd->nodeid || from != info.nodeid) {
@@ -932,11 +966,11 @@ static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
 		return;
 	}
 
-	create = !config_plock_ownership;
+	create = !cfgd_plock_ownership;
 
-	rv = find_resource(mg, info.number, create, &r);
+	rv = find_resource(ls, info.number, create, &r);
 
-	if (rv && config_plock_ownership) {
+	if (rv && cfgd_plock_ownership) {
 		/* There must have been a race with a drop, so we need to
 		   ignore this plock op which will be resent.  If we're the one
 		   who sent the plock, we need to send_own() and put it on the
@@ -948,11 +982,11 @@ static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
 		if (from != our_nodeid)
 			return;
 
-		rv = find_resource(mg, info.number, 1, &r);
+		rv = find_resource(ls, info.number, 1, &r);
 		if (rv)
 			return;
-		send_own(mg, r, our_nodeid);
-		save_pending_plock(mg, r, &info);
+		send_own(ls, r, our_nodeid);
+		save_pending_plock(ls, r, &info);
 		return;
 	}
 	if (rv) {
@@ -987,14 +1021,14 @@ static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
 	   we're the owner of r. */
 
 	if (!r->owner) {
-		__receive_plock(mg, &info, from, r);
+		__receive_plock(ls, &info, from, r);
 
 	} else if (r->owner == -1) {
 		log_debug("receive_plock from %d r %llx owner %d", from,
 			  (unsigned long long)info.number, r->owner);
 
 		if (from == our_nodeid)
-			save_pending_plock(mg, r, &info);
+			save_pending_plock(ls, r, &info);
 
 	} else if (r->owner != our_nodeid) {
 		/* might happen, if frequent change to log_debug */
@@ -1002,7 +1036,7 @@ static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
 			  (unsigned long long)info.number, r->owner);
 
 		if (from == our_nodeid)
-			save_pending_plock(mg, r, &info);
+			save_pending_plock(ls, r, &info);
 
 	} else if (r->owner == our_nodeid) {
 		/* might happen, if frequent change to log_debug */
@@ -1010,33 +1044,28 @@ static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
 			  (unsigned long long)info.number, r->owner);
 
 		if (from == our_nodeid)
-			__receive_plock(mg, &info, from, r);
+			__receive_plock(ls, &info, from, r);
 	}
 }
 
-void receive_plock(struct mountgroup *mg, char *buf, int len, int from)
+void receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	if (mg->save_plocks) {
-		save_message(mg, buf, len, from, MSG_PLOCK);
-		return;
-	}
-
-	if (!mg->got_our_journals) {
-		log_group(mg, "not saving plock messages yet");
+	if (ls->save_plocks) {
+		save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK);
 		return;
 	}
 
-	_receive_plock(mg, buf, len, from);
+	_receive_plock(ls, hd, len);
 }
 
-static int send_struct_info(struct mountgroup *mg, struct gdlm_plock_info *in,
+static int send_struct_info(struct lockspace *ls, struct dlm_plock_info *in,
 			    int msg_type)
 {
+	struct dlm_header *hd;
+	int rv = 0, len;
 	char *buf;
-	int rv, len;
-	struct gdlm_header *hd;
 
-	len = sizeof(struct gdlm_header) + sizeof(struct gdlm_plock_info);
+	len = sizeof(struct dlm_header) + sizeof(struct dlm_plock_info);
 	buf = malloc(len);
 	if (!buf) {
 		rv = -ENOMEM;
@@ -1046,31 +1075,29 @@ static int send_struct_info(struct mountgroup *mg, struct gdlm_plock_info *in,
 
 	info_bswap_out(in);
 
-	hd = (struct gdlm_header *)buf;
+	hd = (struct dlm_header *)buf;
 	hd->type = msg_type;
-	hd->nodeid = our_nodeid;
-	hd->to_nodeid = 0;
 
-	memcpy(buf + sizeof(struct gdlm_header), in, sizeof(*in));
+	memcpy(buf + sizeof(struct dlm_header), in, sizeof(*in));
 
-	rv = send_group_message(mg, len, buf);
+	dlm_send_message(ls, buf, len);
 
 	free(buf);
  out:
 	if (rv)
-		log_error("send plock message error %d", rv);
+		log_error("send_struct_info error %d", rv);
 	return rv;
 }
 
-static void send_plock(struct mountgroup *mg, struct resource *r,
-		       struct gdlm_plock_info *in)
+static void send_plock(struct lockspace *ls, struct resource *r,
+		       struct dlm_plock_info *in)
 {
-	send_struct_info(mg, in, MSG_PLOCK);
+	send_struct_info(ls, in, DLM_MSG_PLOCK);
 }
 
-static void send_own(struct mountgroup *mg, struct resource *r, int owner)
+static void send_own(struct lockspace *ls, struct resource *r, int owner)
 {
-	struct gdlm_plock_info info;
+	struct dlm_plock_info info;
 
 	/* if we've already sent an own message for this resource,
 	   (pending list is not empty), then we shouldn't send another */
@@ -1085,12 +1112,12 @@ static void send_own(struct mountgroup *mg, struct resource *r, int owner)
 	info.number = r->number;
 	info.nodeid = owner;
 
-	send_struct_info(mg, &info, MSG_PLOCK_OWN);
+	send_struct_info(ls, &info, DLM_MSG_PLOCK_OWN);
 }
 
-static void send_syncs(struct mountgroup *mg, struct resource *r)
+static void send_syncs(struct lockspace *ls, struct resource *r)
 {
-	struct gdlm_plock_info info;
+	struct dlm_plock_info info;
 	struct posix_lock *po;
 	struct lock_waiter *w;
 	int rv;
@@ -1105,7 +1132,7 @@ static void send_syncs(struct mountgroup *mg, struct resource *r)
 		info.pid       = po->pid;
 		info.ex        = po->ex;
 
-		rv = send_struct_info(mg, &info, MSG_PLOCK_SYNC_LOCK);
+		rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_LOCK);
 		if (rv)
 			goto out;
 
@@ -1115,7 +1142,7 @@ static void send_syncs(struct mountgroup *mg, struct resource *r)
 	list_for_each_entry(w, &r->waiters, list) {
 		memcpy(&info, &w->info, sizeof(info));
 
-		rv = send_struct_info(mg, &info, MSG_PLOCK_SYNC_WAITER);
+		rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_WAITER);
 		if (rv)
 			goto out;
 
@@ -1125,21 +1152,21 @@ static void send_syncs(struct mountgroup *mg, struct resource *r)
 	return;
 }
 
-static void send_drop(struct mountgroup *mg, struct resource *r)
+static void send_drop(struct lockspace *ls, struct resource *r)
 {
-	struct gdlm_plock_info info;
+	struct dlm_plock_info info;
 
 	memset(&info, 0, sizeof(info));
 	info.number = r->number;
 
-	send_struct_info(mg, &info, MSG_PLOCK_DROP);
+	send_struct_info(ls, &info, DLM_MSG_PLOCK_DROP);
 }
 
 /* plock op can't be handled until we know the owner value of the resource,
    so the op is saved on the pending list until the r owner is established */
 
-static void save_pending_plock(struct mountgroup *mg, struct resource *r,
-			       struct gdlm_plock_info *in)
+static void save_pending_plock(struct lockspace *ls, struct resource *r,
+			       struct dlm_plock_info *in)
 {
 	struct lock_waiter *w;
 
@@ -1148,19 +1175,19 @@ static void save_pending_plock(struct mountgroup *mg, struct resource *r,
 		log_error("save_pending_plock no mem");
 		return;
 	}
-	memcpy(&w->info, in, sizeof(struct gdlm_plock_info));
+	memcpy(&w->info, in, sizeof(struct dlm_plock_info));
 	list_add_tail(&w->list, &r->pending);
 }
 
 /* plock ops are on pending list waiting for ownership to be established.
    owner has now become us, so add these plocks to r */
 
-static void add_pending_plocks(struct mountgroup *mg, struct resource *r)
+static void add_pending_plocks(struct lockspace *ls, struct resource *r)
 {
 	struct lock_waiter *w, *safe;
 
 	list_for_each_entry_safe(w, safe, &r->pending, list) {
-		__receive_plock(mg, &w->info, our_nodeid, r);
+		__receive_plock(ls, &w->info, our_nodeid, r);
 		list_del(&w->list);
 		free(w);
 	}
@@ -1169,32 +1196,32 @@ static void add_pending_plocks(struct mountgroup *mg, struct resource *r)
 /* plock ops are on pending list waiting for ownership to be established.
    owner has now become 0, so send these plocks to everyone */
 
-static void send_pending_plocks(struct mountgroup *mg, struct resource *r)
+static void send_pending_plocks(struct lockspace *ls, struct resource *r)
 {
 	struct lock_waiter *w, *safe;
 
 	list_for_each_entry_safe(w, safe, &r->pending, list) {
-		send_plock(mg, r, &w->info);
+		send_plock(ls, r, &w->info);
 		list_del(&w->list);
 		free(w);
 	}
 }
 
-static void _receive_own(struct mountgroup *mg, char *buf, int len, int from)
+static void _receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	struct gdlm_header *hd = (struct gdlm_header *) buf;
-	struct gdlm_plock_info info;
+	struct dlm_plock_info info;
 	struct resource *r;
 	int should_not_happen = 0;
+	int from = hd->nodeid;
 	int rv;
 
-	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
+	memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
 	info_bswap_in(&info);
 
-	log_plock(mg, "receive own %llx from %u owner %u",
+	log_plock(ls, "receive own %llx from %u owner %u",
 		  (unsigned long long)info.number, hd->nodeid, info.nodeid);
 
-	rv = find_resource(mg, info.number, 1, &r);
+	rv = find_resource(ls, info.number, 1, &r);
 	if (rv)
 		return;
 
@@ -1223,11 +1250,11 @@ static void _receive_own(struct mountgroup *mg, char *buf, int len, int from)
 			if (r->owner == -1) {
 				/* we have gained ownership */
 				r->owner = our_nodeid;
-				add_pending_plocks(mg, r);
+				add_pending_plocks(ls, r);
 			} else if (r->owner == our_nodeid) {
 				should_not_happen = 1;
 			} else if (r->owner == 0) {
-				send_pending_plocks(mg, r);
+				send_pending_plocks(ls, r);
 			} else {
 				/* resource is owned by other node;
 				   they should set owner to 0 shortly */
@@ -1257,7 +1284,7 @@ static void _receive_own(struct mountgroup *mg, char *buf, int len, int from)
 			} else {
 				r->owner = 0;
 				r->flags |= R_GOT_UNOWN;
-				send_pending_plocks(mg, r);
+				send_pending_plocks(ls, r);
 			}
 
 		} else if (info.nodeid == from) {
@@ -1269,8 +1296,8 @@ static void _receive_own(struct mountgroup *mg, char *buf, int len, int from)
 			} else if (r->owner == our_nodeid) {
 				/* we relinquish our ownership: sync our local
 				   plocks to everyone, then set owner to 0 */
-				send_syncs(mg, r);
-				send_own(mg, r, 0);
+				send_syncs(ls, r);
+				send_own(ls, r, 0);
 				/* we need to set owner to 0 here because
 				   local ops may arrive before we receive
 				   our send_own message and can't be added
@@ -1300,17 +1327,17 @@ static void _receive_own(struct mountgroup *mg, char *buf, int len, int from)
 	}
 }
 
-void receive_own(struct mountgroup *mg, char *buf, int len, int from)
+void receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	if (mg->save_plocks) {
-		save_message(mg, buf, len, from, MSG_PLOCK_OWN);
+	if (ls->save_plocks) {
+		save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_OWN);
 		return;
 	}
 
-	_receive_own(mg, buf, len, from);
+	_receive_own(ls, hd, len);
 }
 
-static void clear_syncing_flag(struct resource *r, struct gdlm_plock_info *in)
+static void clear_syncing_flag(struct resource *r, struct dlm_plock_info *in)
 {
 	struct posix_lock *po;
 	struct lock_waiter *w;
@@ -1347,22 +1374,22 @@ static void clear_syncing_flag(struct resource *r, struct gdlm_plock_info *in)
 		  in->nodeid, in->pid, (unsigned long long)in->owner);
 }
 
-static void _receive_sync(struct mountgroup *mg, char *buf, int len, int from)
+static void _receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	struct gdlm_plock_info info;
-	struct gdlm_header *hd = (struct gdlm_header *) buf;
+	struct dlm_plock_info info;
 	struct resource *r;
+	int from = hd->nodeid;
 	int rv;
 
-	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
+	memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
 	info_bswap_in(&info);
 
-	log_plock(mg, "receive sync %llx from %u %s %llx-%llx %d/%u/%llx",
+	log_plock(ls, "receive sync %llx from %u %s %llx-%llx %d/%u/%llx",
 		  (unsigned long long)info.number, from, info.ex ? "WR" : "RD",
 		  (unsigned long long)info.start, (unsigned long long)info.end,
 		  info.nodeid, info.pid, (unsigned long long)info.owner);
 
-	rv = find_resource(mg, info.number, 0, &r);
+	rv = find_resource(ls, info.number, 0, &r);
 	if (rv) {
 		log_error("receive_sync no r %llx from %d", info.number, from);
 		return;
@@ -1374,38 +1401,37 @@ static void _receive_sync(struct mountgroup *mg, char *buf, int len, int from)
 		return;
 	}
 
-	if (hd->type == MSG_PLOCK_SYNC_LOCK)
+	if (hd->type == DLM_MSG_PLOCK_SYNC_LOCK)
 		add_lock(r, info.nodeid, info.owner, info.pid, !info.ex, 
 			 info.start, info.end);
-	else if (hd->type == MSG_PLOCK_SYNC_WAITER)
-		add_waiter(mg, r, &info);
+	else if (hd->type == DLM_MSG_PLOCK_SYNC_WAITER)
+		add_waiter(ls, r, &info);
 }
 
-void receive_sync(struct mountgroup *mg, char *buf, int len, int from)
+void receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	struct gdlm_header *hd = (struct gdlm_header *) buf;
-
-	if (mg->save_plocks) {
-		save_message(mg, buf, len, from, hd->type);
+	if (ls->save_plocks) {
+		save_message(ls, hd, len, hd->nodeid, hd->type);
 		return;
 	}
 
-	_receive_sync(mg, buf, len, from);
+	_receive_sync(ls, hd, len);
 }
 
-static void _receive_drop(struct mountgroup *mg, char *buf, int len, int from)
+static void _receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	struct gdlm_plock_info info;
+	struct dlm_plock_info info;
 	struct resource *r;
+	int from = hd->nodeid;
 	int rv;
 
-	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
+	memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
 	info_bswap_in(&info);
 
-	log_plock(mg, "receive drop %llx from %u",
+	log_plock(ls, "receive drop %llx from %u",
 		  (unsigned long long)info.number, from);
 
-	rv = find_resource(mg, info.number, 0, &r);
+	rv = find_resource(ls, info.number, 0, &r);
 	if (rv) {
 		/* we'll find no r if two nodes sent drop at once */
 		log_debug("receive_drop from %d no r %llx", from,
@@ -1444,14 +1470,14 @@ static void _receive_drop(struct mountgroup *mg, char *buf, int len, int from)
 	}
 }
 
-void receive_drop(struct mountgroup *mg, char *buf, int len, int from)
+void receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
 {
-	if (mg->save_plocks) {
-		save_message(mg, buf, len, from, MSG_PLOCK_DROP);
+	if (ls->save_plocks) {
+		save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_DROP);
 		return;
 	}
 
-	_receive_drop(mg, buf, len, from);
+	_receive_drop(ls, hd, len);
 }
 
 /* We only drop resources from the unowned state to simplify things.
@@ -1460,7 +1486,7 @@ void receive_drop(struct mountgroup *mg, char *buf, int len, int from)
 /* FIXME: in the transition from owner = us, to owner = 0, to drop;
    we want the second period to be shorter than the first */
 
-static int drop_resources(struct mountgroup *mg)
+static int drop_resources(struct lockspace *ls)
 {
 	struct resource *r;
 	struct timeval now;
@@ -1470,21 +1496,21 @@ static int drop_resources(struct mountgroup *mg)
 
 	/* try to drop the oldest, unused resources */
 
-	list_for_each_entry_reverse(r, &mg->resources, list) {
-		if (count >= config_drop_resources_count)
+	list_for_each_entry_reverse(r, &ls->plock_resources, list) {
+		if (count >= cfgd_drop_resources_count)
 			break;
 		if (r->owner && r->owner != our_nodeid)
 			continue;
 		if (time_diff_ms(&r->last_access, &now) <
-		    config_drop_resources_age)
+		    cfgd_drop_resources_age)
 			continue;
 
 		if (list_empty(&r->locks) && list_empty(&r->waiters)) {
 			if (r->owner == our_nodeid) {
-				send_own(mg, r, 0);
+				send_own(ls, r, 0);
 				r->owner = 0;
 			} else if (r->owner == 0 && got_unown(r)) {
-				send_drop(mg, r);
+				send_drop(ls, r);
 			}
 
 			count++;
@@ -1494,36 +1520,53 @@ static int drop_resources(struct mountgroup *mg)
 	return 0;
 }
 
-int process_plocks(void)
+int limit_plocks(void)
 {
-	struct mountgroup *mg;
-	struct resource *r;
-	struct gdlm_plock_info info;
 	struct timeval now;
-	uint64_t usec;
-	int rv;
 
 	/* Don't send more messages while the cpg message queue is backed up */
 
 	if (message_flow_control_on) {
 		update_flow_control_status();
 		if (message_flow_control_on)
-			return -EBUSY;
+			return 1;
 	}
 
+	if (!cfgd_plock_rate_limit || !plock_read_count)
+		return 0;
+
 	gettimeofday(&now, NULL);
 
-	/* Every N ops we check how long it's taken to do those N ops.
-	   If it's less than 1000 ms, we don't take any more. */
+	/* Every time a plock op is read from the kernel, we increment
+	   plock_read_count.  After every cfgd_plock_rate_limit (N) reads,
+	   we check the time it's taken to do those N; if the time is less than
+	   a second, then we delay reading any more until a second is up.
+	   This way we read a max of N ops from the kernel every second. */
 
-	if (config_plock_rate_limit && plock_read_count &&
-	    !(plock_read_count % config_plock_rate_limit)) {
+	if (!(plock_read_count % cfgd_plock_rate_limit)) {
 		if (time_diff_ms(&plock_rate_last, &now) < 1000) {
 			plock_rate_delays++;
-			return -EBUSY;
+			return 2;
 		}
 		plock_rate_last = now;
 	}
+	return 0;
+}
+
+void process_plocks(int ci)
+{
+	struct lockspace *ls;
+	struct resource *r;
+	struct dlm_plock_info info;
+	struct timeval now;
+	uint64_t usec;
+	int rv;
+
+	if (limit_plocks()) {
+		poll_ignore_plock = 1;
+		client_ignore(plock_ci, plock_fd);
+		return;
+	}
 
 	memset(&info, 0, sizeof(info));
 
@@ -1531,25 +1574,28 @@ int process_plocks(void)
 	if (rv < 0) {
 		log_debug("process_plocks: read error %d fd %d\n",
 			  errno, control_fd);
-		return 0;
+		return;
 	}
 
 	/* kernel doesn't set the nodeid field */
 	info.nodeid = our_nodeid;
 
-	if (!plocks_online) {
+	if (!cfgd_enable_plock) {
 		rv = -ENOSYS;
 		goto fail;
 	}
 
-	mg = find_mg_id(info.fsid);
-	if (!mg) {
-		log_debug("process_plocks: no mg id %x", info.fsid);
+	if (need_fsid_translation)
+		info.fsid = mg_to_ls_id(info.fsid);
+
+	ls = find_ls_id(info.fsid);
+	if (!ls) {
+		log_debug("process_plocks: no ls id %x", info.fsid);
 		rv = -EEXIST;
 		goto fail;
 	}
 
-	log_plock(mg, "read plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
+	log_plock(ls, "read plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
 		  (unsigned long long)info.number,
 		  op_str(info.optype),
 		  ex_str(info.optype, info.ex),
@@ -1561,71 +1607,72 @@ int process_plocks(void)
 	plock_read_count++;
 	if (!(plock_read_count % 1000)) {
 		usec = dt_usec(&plock_read_time, &now) ;
-		log_group(mg, "plock_read_count %u time %.3f s delays %u",
+		log_group(ls, "plock_read_count %u time %.3f s delays %u",
 			  plock_read_count, usec * 1.e-6, plock_rate_delays);
 		plock_read_time = now;
 		plock_rate_delays = 0;
 	}
 
-	rv = find_resource(mg, info.number, 1, &r);
+	rv = find_resource(ls, info.number, 1, &r);
 	if (rv)
 		goto fail;
 
 	if (r->owner == 0) {
 		/* plock state replicated on all nodes */
-		send_plock(mg, r, &info);
+		send_plock(ls, r, &info);
 
 	} else if (r->owner == our_nodeid) {
 		/* we are the owner of r, so our plocks are local */
-		__receive_plock(mg, &info, our_nodeid, r);
+		__receive_plock(ls, &info, our_nodeid, r);
 
 	} else {
 		/* r owner is -1: r is new, try to become the owner;
 		   r owner > 0: tell other owner to give up ownership;
 		   both done with a message trying to set owner to ourself */
-		send_own(mg, r, our_nodeid);
-		save_pending_plock(mg, r, &info);
+		send_own(ls, r, our_nodeid);
+		save_pending_plock(ls, r, &info);
 	}
 
-	if (config_plock_ownership &&
-	    time_diff_ms(&mg->drop_resources_last, &now) >=
-	    		 config_drop_resources_time) {
-		mg->drop_resources_last = now;
-		drop_resources(mg);
+	if (cfgd_plock_ownership &&
+	    time_diff_ms(&ls->drop_resources_last, &now) >=
+	    		 cfgd_drop_resources_time) {
+		ls->drop_resources_last = now;
+		drop_resources(ls);
 	}
 
-	return 0;
+	return;
 
  fail:
 	info.rv = rv;
-	rv = write(control_fd, &info, sizeof(info));
-
-	return 0;
+	write(control_fd, &info, sizeof(info));
 }
 
-void process_saved_plocks(struct mountgroup *mg)
+void process_saved_plocks(struct lockspace *ls)
 {
 	struct save_msg *sm, *sm2;
+	struct dlm_header *hd;
 
-	if (list_empty(&mg->saved_messages))
+	if (list_empty(&ls->saved_messages))
 		return;
 
-	log_group(mg, "process_saved_plocks");
+	log_group(ls, "process_saved_plocks");
+
+	list_for_each_entry_safe(sm, sm2, &ls->saved_messages, list) {
+		hd = (struct dlm_header *)sm->buf;
 
-	list_for_each_entry_safe(sm, sm2, &mg->saved_messages, list) {
 		switch (sm->type) {
-		case MSG_PLOCK:
-			_receive_plock(mg, sm->buf, sm->len, sm->nodeid);
+		case DLM_MSG_PLOCK:
+			_receive_plock(ls, hd, sm->len);
 			break;
-		case MSG_PLOCK_OWN:
-			_receive_own(mg, sm->buf, sm->len, sm->nodeid);
+		case DLM_MSG_PLOCK_OWN:
+			_receive_own(ls, hd, sm->len);
 			break;
-		case MSG_PLOCK_DROP:
-			_receive_drop(mg, sm->buf, sm->len, sm->nodeid);
+		case DLM_MSG_PLOCK_DROP:
+			_receive_drop(ls, hd, sm->len);
 			break;
-		case MSG_PLOCK_SYNC_LOCK:
-		case MSG_PLOCK_SYNC_WAITER:
-			_receive_sync(mg, sm->buf, sm->len, sm->nodeid);
+		case DLM_MSG_PLOCK_SYNC_LOCK:
+		case DLM_MSG_PLOCK_SYNC_WAITER:
+			_receive_sync(ls, hd, sm->len);
 			break;
 		default:
 			continue;
@@ -1638,14 +1685,13 @@ void process_saved_plocks(struct mountgroup *mg)
 
 void plock_exit(void)
 {
-	if (plocks_online)
-		saCkptFinalize(ckpt_handle);
+	saCkptFinalize(system_ckpt_handle);
 }
 
 /* locks still marked SYNCING should not go into the ckpt; the new node
    will get those locks by receiving PLOCK_SYNC messages */
 
-static void pack_section_buf(struct mountgroup *mg, struct resource *r)
+static void pack_section_buf(struct lockspace *ls, struct resource *r)
 {
 	struct pack_plock *pp;
 	struct posix_lock *po;
@@ -1689,7 +1735,7 @@ static void pack_section_buf(struct mountgroup *mg, struct resource *r)
 	section_len = count * sizeof(struct pack_plock);
 }
 
-static int unpack_section_buf(struct mountgroup *mg, char *numbuf, int buflen)
+static int unpack_section_buf(struct lockspace *ls, char *numbuf, int buflen)
 {
 	struct pack_plock *pp;
 	struct posix_lock *po;
@@ -1710,10 +1756,7 @@ static int unpack_section_buf(struct mountgroup *mg, char *numbuf, int buflen)
 	INIT_LIST_HEAD(&r->waiters);
 	INIT_LIST_HEAD(&r->pending);
 
-	if (config_plock_ownership)
-		sscanf(numbuf, "r%llu.%d", &num, &owner);
-	else
-		sscanf(numbuf, "r%llu", &num);
+	sscanf(numbuf, "r%llu.%d", &num, &owner);
 
 	r->number = num;
 	r->owner = owner;
@@ -1744,46 +1787,51 @@ static int unpack_section_buf(struct mountgroup *mg, char *numbuf, int buflen)
 		pp++;
 	}
 
-	list_add_tail(&r->list, &mg->resources);
+	list_add_tail(&r->list, &ls->plock_resources);
 	return 0;
 }
 
-int _unlink_checkpoint(struct mountgroup *mg, SaNameT *name)
+/* If we are the new ckpt_node, we'll be unlinking a ckpt that we don't
+   have open, which was created by the previous ckpt_node.  The previous
+   ckpt_node should have closed the ckpt in set_plock_ckpt_node() so it
+   will go away when we unlink it here. */
+
+static int _unlink_checkpoint(struct lockspace *ls, SaNameT *name)
 {
 	SaCkptCheckpointHandleT h;
 	SaCkptCheckpointDescriptorT s;
 	SaAisErrorT rv;
 	int ret = 0;
 
-	h = (SaCkptCheckpointHandleT) mg->cp_handle;
-	log_group(mg, "unlink ckpt %llx", (unsigned long long)h);
+	h = (SaCkptCheckpointHandleT) ls->plock_ckpt_handle;
+	log_group(ls, "unlink ckpt %llx", (unsigned long long)h);
 
  unlink_retry:
-	rv = saCkptCheckpointUnlink(ckpt_handle, name);
+	rv = saCkptCheckpointUnlink(system_ckpt_handle, name);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
-		log_group(mg, "unlink ckpt retry");
+		log_group(ls, "unlink ckpt retry");
 		sleep(1);
 		goto unlink_retry;
 	}
 	if (rv == SA_AIS_OK)
 		goto out_close;
 
-	log_error("unlink ckpt error %d %s", rv, mg->name);
+	log_error("unlink ckpt error %d %s", rv, ls->name);
 	ret = -1;
 
  status_retry:
 	rv = saCkptCheckpointStatusGet(h, &s);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
-		log_group(mg, "unlink ckpt status retry");
+		log_group(ls, "unlink ckpt status retry");
 		sleep(1);
 		goto status_retry;
 	}
 	if (rv != SA_AIS_OK) {
-		log_error("unlink ckpt status error %d %s", rv, mg->name);
+		log_error("unlink ckpt status error %d %s", rv, ls->name);
 		goto out_close;
 	}
 
-	log_group(mg, "unlink ckpt status: size %llu, max sections %u, "
+	log_group(ls, "unlink ckpt status: size %llu, max sections %u, "
 		      "max section size %llu, section count %u, mem %u",
 		 (unsigned long long)s.checkpointCreationAttributes.checkpointSize,
 		 s.checkpointCreationAttributes.maxSections,
@@ -1796,31 +1844,43 @@ int _unlink_checkpoint(struct mountgroup *mg, SaNameT *name)
 
 	rv = saCkptCheckpointClose(h);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
-		log_group(mg, "unlink ckpt close retry");
+		log_group(ls, "unlink ckpt close retry");
 		sleep(1);
 		goto out_close;
 	}
 	if (rv != SA_AIS_OK) {
 		log_error("unlink ckpt %llx close err %d %s",
-			  (unsigned long long)h, rv, mg->name);
+			  (unsigned long long)h, rv, ls->name);
 		/* should we return an error here and possibly cause
 		   store_plocks() to fail on this? */
 		/* ret = -1; */
 	}
  out:
-	mg->cp_handle = 0;
+	ls->plock_ckpt_handle = 0;
 	return ret;
 }
 
-int unlink_checkpoint(struct mountgroup *mg)
+void close_plock_checkpoint(struct lockspace *ls)
 {
-	SaNameT name;
-	int len;
+	SaCkptCheckpointHandleT h;
+	SaAisErrorT rv;
 
-	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "gfsplock.%s",
-		       mg->name);
-	name.length = len;
-	return _unlink_checkpoint(mg, &name);
+	h = (SaCkptCheckpointHandleT) ls->plock_ckpt_handle;
+	if (!h)
+		return;
+ retry:
+	rv = saCkptCheckpointClose(h);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "close_plock_checkpoint retry");
+		sleep(1);
+		goto retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("close_plock_checkpoint %llx err %d %s",
+			  (unsigned long long)h, rv, ls->name);
+	}
+
+	ls->plock_ckpt_handle = 0;
 }
 
 /*
@@ -1847,7 +1907,7 @@ int unlink_checkpoint(struct mountgroup *mg)
    it.  The ckpt should then disappear and the new node can create a new ckpt
    for the next mounter. */
 
-void store_plocks(struct mountgroup *mg, int nodeid)
+void store_plocks(struct lockspace *ls)
 {
 	SaCkptCheckpointCreationAttributesT attr;
 	SaCkptCheckpointHandleT h;
@@ -1863,25 +1923,21 @@ void store_plocks(struct mountgroup *mg, int nodeid)
 	int r_count, lock_count, total_size, section_size, max_section_size;
 	int len, owner;
 
-	if (!plocks_online)
+	if (!cfgd_enable_plock)
 		return;
 
 	/* no change to plock state since we created the last checkpoint */
-	if (mg->last_checkpoint_time > mg->last_plock_time) {
-		log_group(mg, "store_plocks: saved ckpt uptodate");
+	if (ls->last_checkpoint_time > ls->last_plock_time) {
+		log_group(ls, "store_plocks: saved ckpt uptodate");
 		goto out;
 	}
-	mg->last_checkpoint_time = time(NULL);
+	ls->last_checkpoint_time = time(NULL);
 
-	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "gfsplock.%s",
-		       mg->name);
+	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmplock.%s",
+		       ls->name);
 	name.length = len;
 
-	/* unlink an old checkpoint before we create a new one */
-	if (mg->cp_handle) {
-		if (_unlink_checkpoint(mg, &name))
-			return;
-	}
+	_unlink_checkpoint(ls, &name);
 
 	/* loop through all plocks to figure out sizes to set in
 	   the attr fields */
@@ -1891,7 +1947,7 @@ void store_plocks(struct mountgroup *mg, int nodeid)
 	total_size = 0;
 	max_section_size = 0;
 
-	list_for_each_entry(r, &mg->resources, list) {
+	list_for_each_entry(r, &ls->plock_resources, list) {
 		if (r->owner == -1)
 			continue;
 
@@ -1910,10 +1966,10 @@ void store_plocks(struct mountgroup *mg, int nodeid)
 			max_section_size = section_size;
 	}
 
-	log_group(mg, "store_plocks: r_count %d, lock_count %d, pp %u bytes",
+	log_group(ls, "store_plocks: r_count %d, lock_count %d, pp %u bytes",
 		  r_count, lock_count, (unsigned int)sizeof(struct pack_plock));
 
-	log_group(mg, "store_plocks: total %d bytes, max_section %d bytes",
+	log_group(ls, "store_plocks: total %d bytes, max_section %d bytes",
 		  total_size, max_section_size);
 
 	attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
@@ -1928,24 +1984,24 @@ void store_plocks(struct mountgroup *mg, int nodeid)
 		SA_CKPT_CHECKPOINT_CREATE;
 
  open_retry:
-	rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h);
+	rv = saCkptCheckpointOpen(system_ckpt_handle, &name,&attr,flags,0,&h);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
-		log_group(mg, "store_plocks: ckpt open retry");
+		log_group(ls, "store_plocks: ckpt open retry");
 		sleep(1);
 		goto open_retry;
 	}
 	if (rv == SA_AIS_ERR_EXIST) {
-		log_group(mg, "store_plocks: ckpt already exists");
+		log_group(ls, "store_plocks: ckpt already exists");
 		return;
 	}
 	if (rv != SA_AIS_OK) {
-		log_error("store_plocks: ckpt open error %d %s", rv, mg->name);
+		log_error("store_plocks: ckpt open error %d %s", rv, ls->name);
 		return;
 	}
 
-	log_group(mg, "store_plocks: open ckpt handle %llx",
+	log_group(ls, "store_plocks: open ckpt handle %llx",
 		  (unsigned long long)h);
-	mg->cp_handle = (uint64_t) h;
+	ls->plock_ckpt_handle = (uint64_t) h;
 
 	/* - If r owner is -1, ckpt nothing.
 	   - If r owner is us, ckpt owner of us and no plocks.
@@ -1957,7 +2013,7 @@ void store_plocks(struct mountgroup *mg, int nodeid)
 	   - If r owner is 0 and got_unown, then ckpt owner 0 and all plocks;
 	     (there should be no SYNCING plocks) */
 
-	list_for_each_entry(r, &mg->resources, list) {
+	list_for_each_entry(r, &ls->plock_resources, list) {
 		if (r->owner == -1)
 			continue;
 		else if (r->owner == our_nodeid)
@@ -1975,12 +2031,8 @@ void store_plocks(struct mountgroup *mg, int nodeid)
 		}
 
 		memset(&buf, 0, sizeof(buf));
-		if (config_plock_ownership)
-			len = snprintf(buf, SECTION_NAME_LEN, "r%llu.%d",
-			       	       (unsigned long long)r->number, owner);
-		else
-			len = snprintf(buf, SECTION_NAME_LEN, "r%llu",
-			       	       (unsigned long long)r->number);
+		len = snprintf(buf, SECTION_NAME_LEN, "r%llu.%d",
+			       (unsigned long long)r->number, owner);
 
 		section_id.id = (void *)buf;
 		section_id.idLen = len + 1;
@@ -1990,49 +2042,42 @@ void store_plocks(struct mountgroup *mg, int nodeid)
 		memset(&section_buf, 0, sizeof(section_buf));
 		section_len = 0;
 
-		pack_section_buf(mg, r);
+		pack_section_buf(ls, r);
 
-		log_group(mg, "store_plocks: section size %u id %u \"%s\"",
+		log_group(ls, "store_plocks: section size %u id %u \"%s\"",
 			  section_len, section_id.idLen, buf);
 
 	 create_retry:
 		rv = saCkptSectionCreate(h, &section_attr, &section_buf,
 					 section_len);
 		if (rv == SA_AIS_ERR_TRY_AGAIN) {
-			log_group(mg, "store_plocks: ckpt create retry");
+			log_group(ls, "store_plocks: ckpt create retry");
 			sleep(1);
 			goto create_retry;
 		}
 		if (rv == SA_AIS_ERR_EXIST) {
 			/* this shouldn't happen in general */
-			log_group(mg, "store_plocks: clearing old ckpt");
+			log_group(ls, "store_plocks: clearing old ckpt");
+			/* do we need this close or will the close in
+			   the unlink function be ok? */
 			saCkptCheckpointClose(h);
-			_unlink_checkpoint(mg, &name);
+			_unlink_checkpoint(ls, &name);
 			goto open_retry;
 		}
 		if (rv != SA_AIS_OK) {
 			log_error("store_plocks: ckpt section create err %d %s",
-				  rv, mg->name);
+				  rv, ls->name);
 			break;
 		}
 	}
-
  out:
-	/* If the new nodeid is becoming the low nodeid it will now be in
-	   charge of creating ckpt's for mounters instead of us. */
-
-	if (nodeid < our_nodeid) {
-		log_group(mg, "store_plocks: closing ckpt for new low node %d",
-			  nodeid);
-		saCkptCheckpointClose(h);
-		mg->cp_handle = 0;
-	}
+	return;
 }
 
 /* called by a node that's just been added to the group to get existing plock
    state */
 
-void retrieve_plocks(struct mountgroup *mg)
+void retrieve_plocks(struct lockspace *ls)
 {
 	SaCkptCheckpointHandleT h;
 	SaCkptSectionIterationHandleT itr;
@@ -2043,39 +2088,39 @@ void retrieve_plocks(struct mountgroup *mg)
 	char buf[SECTION_NAME_LEN];
 	int len;
 
-	if (!plocks_online)
+	if (!cfgd_enable_plock)
 		return;
 
-	log_group(mg, "retrieve_plocks");
+	log_group(ls, "retrieve_plocks");
 
-	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "gfsplock.%s",
-		       mg->name);
+	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmplock.%s",
+		       ls->name);
 	name.length = len;
 
  open_retry:
-	rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
+	rv = saCkptCheckpointOpen(system_ckpt_handle, &name, NULL,
 				  SA_CKPT_CHECKPOINT_READ, 0, &h);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
-		log_group(mg, "retrieve_plocks: ckpt open retry");
+		log_group(ls, "retrieve_plocks: ckpt open retry");
 		sleep(1);
 		goto open_retry;
 	}
 	if (rv != SA_AIS_OK) {
 		log_error("retrieve_plocks: ckpt open error %d %s",
-			  rv, mg->name);
+			  rv, ls->name);
 		return;
 	}
 
  init_retry:
 	rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
-		log_group(mg, "retrieve_plocks: ckpt iterinit retry");
+		log_group(ls, "retrieve_plocks: ckpt iterinit retry");
 		sleep(1);
 		goto init_retry;
 	}
 	if (rv != SA_AIS_OK) {
 		log_error("retrieve_plocks: ckpt iterinit error %d %s",
-			  rv, mg->name);
+			  rv, ls->name);
 		goto out;
 	}
 
@@ -2085,13 +2130,13 @@ void retrieve_plocks(struct mountgroup *mg)
 		if (rv == SA_AIS_ERR_NO_SECTIONS)
 			break;
 		if (rv == SA_AIS_ERR_TRY_AGAIN) {
-			log_group(mg, "retrieve_plocks: ckpt iternext retry");
+			log_group(ls, "retrieve_plocks: ckpt iternext retry");
 			sleep(1);
 			goto next_retry;
 		}
 		if (rv != SA_AIS_OK) {
 			log_error("retrieve_plocks: ckpt iternext error %d %s",
-				  rv, mg->name);
+				  rv, ls->name);
 			goto out_it;
 		}
 
@@ -2105,24 +2150,24 @@ void retrieve_plocks(struct mountgroup *mg)
 
 		memset(&buf, 0, sizeof(buf));
 		snprintf(buf, SECTION_NAME_LEN, "%s", desc.sectionId.id);
-		log_group(mg, "retrieve_plocks: section size %llu id %u \"%s\"",
+		log_group(ls, "retrieve_plocks: section size %llu id %u \"%s\"",
 			  (unsigned long long)iov.dataSize, iov.sectionId.idLen,
 			  buf);
 
 	 read_retry:
 		rv = saCkptCheckpointRead(h, &iov, 1, NULL);
 		if (rv == SA_AIS_ERR_TRY_AGAIN) {
-			log_group(mg, "retrieve_plocks: ckpt read retry");
+			log_group(ls, "retrieve_plocks: ckpt read retry");
 			sleep(1);
 			goto read_retry;
 		}
 		if (rv != SA_AIS_OK) {
 			log_error("retrieve_plocks: ckpt read error %d %s",
-				  rv, mg->name);
+				  rv, ls->name);
 			goto out_it;
 		}
 
-		log_group(mg, "retrieve_plocks: ckpt read %llu bytes",
+		log_group(ls, "retrieve_plocks: ckpt read %llu bytes",
 			  (unsigned long long)iov.readSize);
 		section_len = iov.readSize;
 
@@ -2131,38 +2176,35 @@ void retrieve_plocks(struct mountgroup *mg)
 
 		if (section_len % sizeof(struct pack_plock)) {
 			log_error("retrieve_plocks: bad section len %d %s",
-				  section_len, mg->name);
+				  section_len, ls->name);
 			continue;
 		}
 
-		unpack_section_buf(mg, (char *)desc.sectionId.id,
+		unpack_section_buf(ls, (char *)desc.sectionId.id,
 				   desc.sectionId.idLen);
 	}
 
  out_it:
 	saCkptSectionIterationFinalize(itr);
  out:
-	if (mg->low_nodeid == our_nodeid) {
-		/* we're the new low nodeid, will be master */
-		log_group(mg, "retrieve_plocks: unlink ckpt from old master");
-		mg->cp_handle = (uint64_t) h;
-		_unlink_checkpoint(mg, &name);
-	} else
-		saCkptCheckpointClose(h);
+	saCkptCheckpointClose(h);
 }
 
 /* Called when a node has failed, or we're unmounting.  For a node failure, we
    need to call this when the cpg confchg arrives so that we're guaranteed all
    nodes do this in the same sequence wrt other messages. */
 
-void purge_plocks(struct mountgroup *mg, int nodeid, int unmount)
+void purge_plocks(struct lockspace *ls, int nodeid, int unmount)
 {
 	struct posix_lock *po, *po2;
 	struct lock_waiter *w, *w2;
 	struct resource *r, *r2;
 	int purged = 0;
 
-	list_for_each_entry_safe(r, r2, &mg->resources, list) {
+	if (!cfgd_enable_plock)
+		return;
+
+	list_for_each_entry_safe(r, r2, &ls->plock_resources, list) {
 		list_for_each_entry_safe(po, po2, &r->locks, list) {
 			if (po->nodeid == nodeid || unmount) {
 				list_del(&po->list);
@@ -2185,13 +2227,13 @@ void purge_plocks(struct mountgroup *mg, int nodeid, int unmount)
 
 		if (r->owner == nodeid) {
 			r->owner = 0;
-			send_pending_plocks(mg, r);
+			send_pending_plocks(ls, r);
 		}
 		
 		if (!list_empty(&r->waiters))
-			do_waiters(mg, r);
+			do_waiters(ls, r);
 
-		if (!config_plock_ownership &&
+		if (!cfgd_plock_ownership &&
 		    list_empty(&r->locks) && list_empty(&r->waiters)) {
 			list_del(&r->list);
 			free(r);
@@ -2199,21 +2241,14 @@ void purge_plocks(struct mountgroup *mg, int nodeid, int unmount)
 	}
 	
 	if (purged)
-		mg->last_plock_time = time(NULL);
-
-	log_group(mg, "purged %d plocks for %d", purged, nodeid);
-
-	/* we may have a saved ckpt that we created for the last mounter,
-	   we need to unlink it so another node can create a new ckpt for
-	   the next mounter after we leave */
+		ls->last_plock_time = time(NULL);
 
-	if (unmount && mg->cp_handle)
-		unlink_checkpoint(mg);
+	log_group(ls, "purged %d plocks for %d", purged, nodeid);
 }
 
 int dump_plocks(char *name, int fd)
 {
-	struct mountgroup *mg;
+	struct lockspace *ls;
 	struct posix_lock *po;
 	struct lock_waiter *w;
 	struct resource *r;
@@ -2223,11 +2258,11 @@ int dump_plocks(char *name, int fd)
 	if (!name)
 		return -1;
 
-	mg = find_mg(name);
-	if (!mg)
+	ls = find_ls(name);
+	if (!ls)
 		return -1;
 
-	list_for_each_entry(r, &mg->resources, list) {
+	list_for_each_entry(r, &ls->plock_resources, list) {
 		list_for_each_entry(po, &r->locks, list) {
 			snprintf(line, MAXLINE,
 			      "%llu %s %llu-%llu nodeid %d pid %u owner %llx\n",
diff --git a/group/include/list.h b/group/include/list.h
index 566b377..8100cbc 100644
--- a/group/include/list.h
+++ b/group/include/list.h
@@ -226,6 +226,17 @@ static inline void list_splice_init(struct list_head *list,
 	container_of(ptr, type, member)
 
 /**
+ * list_first_entry - get the first element from a list
+ * @ptr:        the list head to take the element from.
+ * @type:       the type of the struct this is embedded in.
+ * @member:     the name of the list_struct within the struct.
+ *
+ * Note, that list is expected to be not empty.
+ */
+#define list_first_entry(ptr, type, member) \
+	list_entry((ptr)->next, type, member)
+
+/**
  * list_for_each	-	iterate over a list
  * @pos:	the &struct list_head to use as a loop counter.
  * @head:	the head for your list.


hooks/post-receive
--
Cluster Project


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]