[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] cluster/group/dlm_controld Makefile action.c d ...



CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	teigland sourceware org	2007-07-24 18:15:43

Modified files:
	group/dlm_controld: Makefile action.c dlm_daemon.h group.c 
	                    main.c member_cman.c 
Added files:
	group/dlm_controld: deadlock.c dlm_controld.h 

Log message:
	add new code to find and resolve deadlocks, still incomplete, disabled
	by default

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/deadlock.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/dlm_controld.h.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/Makefile.diff?cvsroot=cluster&r1=1.7&r2=1.8
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/action.c.diff?cvsroot=cluster&r1=1.12&r2=1.13
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/dlm_daemon.h.diff?cvsroot=cluster&r1=1.9&r2=1.10
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/group.c.diff?cvsroot=cluster&r1=1.3&r2=1.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/main.c.diff?cvsroot=cluster&r1=1.11&r2=1.12
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/member_cman.c.diff?cvsroot=cluster&r1=1.7&r2=1.8

/cvs/cluster/cluster/group/dlm_controld/deadlock.c,v  -->  standard output
revision 1.1
--- cluster/group/dlm_controld/deadlock.c
+++ -	2007-07-24 18:15:43.735234000 +0000
@@ -0,0 +1,1496 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2007 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "dlm_daemon.h"
+
+int deadlock_enabled;
+
+extern struct list_head lockspaces;
+extern int our_nodeid;
+
+static SaCkptHandleT global_ckpt_h;
+static SaCkptCallbacksT callbacks = { 0, 0 };
+static SaVersionT version = { 'B', 1, 1 };
+static char section_buf[10 * 1024 * 1024];  /* 10MB of pack_lock's enough? */
+static uint32_t section_len;
+static uint32_t section_max;
+
+struct node {
+	struct list_head	list;
+	int			nodeid;
+	int			checkpoint_ready;
+};
+
+/* from linux/fs/dlm/dlm_internal.h */
+#define DLM_LKSTS_WAITING       1
+#define DLM_LKSTS_GRANTED       2
+#define DLM_LKSTS_CONVERT       3
+
+struct pack_lock {
+	uint64_t		xid;
+	uint32_t		id;
+	int			nodeid;
+	uint32_t		remid;
+	int			ownpid;
+	uint32_t		exflags;
+	uint32_t		flags;
+	int8_t			status;
+	int8_t			grmode;
+	int8_t			rqmode;
+	int8_t			pad;
+};
+
+struct dlm_rsb {
+	struct list_head	list;
+	struct list_head	locks;
+	char			name[DLM_RESNAME_MAXLEN];
+	int			len;
+};
+
+struct dlm_lkb {
+	struct list_head        list;       /* r->locks */
+	struct pack_lock	lock;       /* data from debugfs/checkpoint */
+	unsigned int		time;       /* waiting time read from debugfs */
+	int			from;       /* node that checkpointed the lock */
+	struct dlm_rsb		*rsb;       /* lock against this resource */
+	struct trans		*trans;     /* lock owned by this transaction */
+	struct list_head	trans_list; /* tr->locks */
+};
+
+#define TR_NALLOC 4               /* waitfor pointers alloc'ed 4 at at time */
+
+struct trans {
+	struct list_head list;
+	struct list_head locks;
+	uint64_t xid;
+	int others_waiting_on_us; /* count of trans's pointing to us in waitfor */
+	int waitfor_alloc;
+	int waitfor_count;        /* count of in-use waitfor slots */
+	struct trans **waitfor;   /* waitfor_alloc trans pointers */
+};
+
+#define DLM_HEADER_MAJOR 1
+#define DLM_HEADER_MINOR 0
+#define DLM_HEADER_PATCH 0
+
+#define DLM_MSG_CYCLE_START 1
+#define DLM_MSG_CHECKPOINT_READY 2
+#define DLM_MSG_CANCEL_LOCK 3
+
+struct dlm_header {
+	uint16_t	version[3];
+	uint16_t	type; /* MSG_ */
+	uint32_t	nodeid; /* sender */
+	uint32_t	to_nodeid; /* 0 if to all */
+	uint32_t	global_id;
+	uint32_t	lock_id;
+	uint32_t	pad;
+	char		name[MAXNAME];
+};
+
+static const int __dlm_compat_matrix[8][8] = {
+      /* UN NL CR CW PR PW EX PD */
+        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
+        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
+        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
+        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
+        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
+        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
+        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
+        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
+};
+
+static inline int dlm_modes_compat(int mode1, int mode2)
+{
+	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
+}
+
+static void free_resources(struct lockspace *ls)
+{
+	struct dlm_rsb *r, *r_safe;
+	struct dlm_lkb *lkb, *lkb_safe;
+
+	list_for_each_entry_safe(r, r_safe, &ls->resources, list) {
+		list_for_each_entry_safe(lkb, lkb_safe, &r->locks, list) {
+			list_del(&lkb->list);
+			free(lkb);
+		}
+		list_del(&r->list);
+		free(r);
+	}
+}
+
+static void disable_deadlock(void)
+{
+	log_error("FIXME: deadlock detection disabled");
+}
+
+void setup_deadlock(void)
+{
+	SaAisErrorT rv;
+
+	if (!deadlock_enabled)
+		return;
+
+	rv = saCkptInitialize(&global_ckpt_h, &callbacks, &version);
+	if (rv != SA_AIS_OK)
+		log_error("ckpt init error %d", rv);
+}
+
+/* FIXME: use private data hooks into libcpg to save ls */
+
+static struct lockspace *find_ls_by_handle(cpg_handle_t h)
+{
+	struct lockspace *ls;
+
+	list_for_each_entry(ls, &lockspaces, list) {
+		if (ls->cpg_h == h)
+			return ls;
+	}
+	return NULL;
+}
+
+static struct dlm_rsb *get_resource(struct lockspace *ls, char *name, int len)
+{
+	struct dlm_rsb *r;
+
+	list_for_each_entry(r, &ls->resources, list) {
+		if (r->len == len && !strncmp(r->name, name, len))
+			return r;
+	}
+
+	r = malloc(sizeof(struct dlm_rsb));
+	if (!r) {
+		log_error("get_resource: no memory");
+		disable_deadlock();
+		return NULL;
+	}
+	memset(r, 0, sizeof(struct dlm_rsb));
+	memcpy(r->name, name, len);
+	r->len = len;
+	INIT_LIST_HEAD(&r->locks);
+	list_add(&r->list, &ls->resources);
+	return r;
+}
+
+static struct dlm_lkb *create_lkb(void)
+{
+	struct dlm_lkb *lkb;
+
+	lkb = malloc(sizeof(struct dlm_lkb));
+	if (!lkb) {
+		log_error("create_lkb: no memory");
+		disable_deadlock();
+	} else {
+		memset(lkb, 0, sizeof(struct dlm_lkb));
+		INIT_LIST_HEAD(&lkb->trans_list);
+	}
+	return lkb;
+}
+
+static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+	list_add(&lkb->list, &r->locks);
+	lkb->rsb = r;
+}
+
+#define LOCK_LINE_MAX 1024
+
+static void parse_r_name(char *line, char *name)
+{
+	char *p;
+	int i = 0;
+	int begin = 0;
+
+	for (p = line; ; p++) {
+		if (*p == '"') {
+			if (begin)
+				break;
+			begin = 1;
+			continue;
+		}
+		if (begin)
+			name[i++] = *p;
+	}
+}
+
+/* old/original way of dumping (only master state) in 5.1 kernel */
+
+static int read_debugfs_master(struct lockspace *ls)
+{
+	FILE *file;
+	char path[PATH_MAX];
+	char line[LOCK_LINE_MAX];
+	struct dlm_rsb *r;
+	struct dlm_lkb *lkb;
+	char r_name[65];
+	unsigned long long xid;
+	int r_len;
+	int rv;
+
+	snprintf(path, PATH_MAX, "/sys/kernel/debug/dlm/%s_master", ls->name);
+
+	file = fopen(path, "r");
+	if (!file)
+		return -1;
+
+	/* skip the header on the first line */
+	fgets(line, LOCK_LINE_MAX, file);
+
+	while (fgets(line, LOCK_LINE_MAX, file)) {
+		lkb = create_lkb();
+		if (!lkb)
+			break;
+
+		rv = sscanf(line, "%x %d %x %u %llu %x %hhd %hhd %hhd %u %d",
+			    &lkb->lock.id,
+			    &lkb->lock.nodeid,
+			    &lkb->lock.remid,
+			    &lkb->lock.ownpid,
+			    &xid,
+			    &lkb->lock.exflags,
+			    &lkb->lock.status,
+			    &lkb->lock.grmode,
+			    &lkb->lock.rqmode,
+			    &lkb->time,
+			    &r_len);
+
+		lkb->lock.xid = xid; /* hack to avoid warning */
+
+		log_debug("%s", line);
+
+		if (rv != 11) {
+			log_error("invalid debugfs line %d: %s", rv, line);
+			free(lkb);
+			goto out;
+		}
+
+		memset(r_name, 0, sizeof(r_name));
+		parse_r_name(line, r_name);
+
+		r = get_resource(ls, r_name, r_len);
+		if (!r)
+			break;
+		add_lkb(r, lkb);
+	}
+ out:
+	fclose(file);
+	return 0;
+}
+
+static int read_debugfs_locks(struct lockspace *ls)
+{
+	FILE *file;
+	char path[PATH_MAX];
+	char line[LOCK_LINE_MAX];
+	struct dlm_rsb *r;
+	struct dlm_lkb *lkb;
+	char r_name[65];
+	unsigned long long xid;
+	int r_nodeid;
+	int r_len;
+	int rv;
+
+	snprintf(path, PATH_MAX, "/sys/kernel/debug/dlm/%s_locks", ls->name);
+
+	file = fopen(path, "r");
+	if (!file)
+		return -1;
+
+	/* skip the header on the first line */
+	fgets(line, LOCK_LINE_MAX, file);
+
+	while (fgets(line, LOCK_LINE_MAX, file)) {
+		lkb = create_lkb();
+		if (!lkb)
+			break;
+
+		rv = sscanf(line, "%x %d %x %u %llu %x %x %hhd %hhd %hhd %u %d %d",
+			    &lkb->lock.id,
+			    &lkb->lock.nodeid,
+			    &lkb->lock.remid,
+			    &lkb->lock.ownpid,
+			    &xid,
+			    &lkb->lock.exflags,
+			    &lkb->lock.flags,
+			    &lkb->lock.status,
+			    &lkb->lock.grmode,
+			    &lkb->lock.rqmode,
+			    &lkb->time,
+			    &r_nodeid,
+			    &r_len);
+
+		lkb->lock.xid = xid; /* hack to avoid warning */
+
+		if (rv != 13) {
+			log_error("invalid debugfs line %d: %s", rv, line);
+			free(lkb);
+			goto out;
+		}
+
+		memset(r_name, 0, sizeof(r_name));
+		parse_r_name(line, r_name);
+
+		/* only collecting master lock state */
+		if (r_nodeid)
+			continue;
+
+		r = get_resource(ls, r_name, r_len);
+		if (!r)
+			break;
+		add_lkb(r, lkb);
+	}
+ out:
+	fclose(file);
+	return 0;
+}
+
+static int pack_lkb_list(struct list_head *q, struct pack_lock **lockp)
+{
+	struct dlm_lkb *lkb;
+	struct pack_lock *lock = *lockp;
+	int count = 0;
+
+	list_for_each_entry(lkb, q, list) {
+		if (count + 1 > section_max) {
+			log_error("too many locks %d for ckpt buf", count);
+			break;
+		}
+
+		lock->xid     = cpu_to_le64(lkb->lock.xid);
+		lock->id      = cpu_to_le32(lkb->lock.id);
+		lock->nodeid  = cpu_to_le32(lkb->lock.nodeid);
+		lock->remid   = cpu_to_le32(lkb->lock.remid);
+		lock->ownpid  = cpu_to_le32(lkb->lock.ownpid);
+		lock->exflags = cpu_to_le32(lkb->lock.exflags);
+		lock->flags   = cpu_to_le32(lkb->lock.flags);
+		lock->status  = lkb->lock.status;
+		lock->grmode  = lkb->lock.grmode;
+		lock->rqmode  = lkb->lock.rqmode;
+		lock->pad     = lkb->lock.pad;
+
+		lock++;
+		count++;
+	}
+	return count;
+}
+
+static void pack_section_buf(struct lockspace *ls, struct dlm_rsb *r)
+{
+	struct pack_lock *lock;
+	int count;
+
+	memset(&section_buf, 0, sizeof(section_buf));
+	section_max = sizeof(section_buf) / sizeof(struct pack_lock);
+
+	lock = (struct pack_lock *) &section_buf;
+
+	count = pack_lkb_list(&r->locks, &lock);
+
+	section_len = count * sizeof(struct pack_lock);
+}
+
+static int unpack_section_buf(struct lockspace *ls, int nodeid,
+			      char *numbuf, int buflen)
+{
+	struct dlm_rsb *r;
+	struct dlm_lkb *lkb;
+	struct pack_lock *lock;
+	int count = section_len / sizeof(struct pack_lock);
+	int i;
+
+	r = get_resource(ls, numbuf, buflen);
+	if (!r)
+		return -1;
+
+	lock = (struct pack_lock *) &section_buf;
+
+	for (i = 0; i < count; i++) {
+		lkb = create_lkb();
+		if (!lkb)
+			break;
+
+		lkb->lock.xid     = le64_to_cpu(lock->xid);
+		lkb->lock.id      = le32_to_cpu(lock->id);
+		lkb->lock.nodeid  = le32_to_cpu(lock->nodeid);
+		lkb->lock.remid   = le32_to_cpu(lock->remid);
+		lkb->lock.ownpid  = le32_to_cpu(lock->ownpid);
+		lkb->lock.exflags = le32_to_cpu(lock->exflags);
+		lkb->lock.flags   = le32_to_cpu(lock->flags);
+		lkb->lock.status  = lock->status;
+		lkb->lock.grmode  = lock->grmode;
+		lkb->lock.rqmode  = lock->rqmode;
+		lkb->lock.pad     = lock->pad;
+
+		lkb->from = nodeid;
+		add_lkb(r, lkb);
+		lock++;
+	}
+	return 0;
+}
+
+static int _unlink_checkpoint(struct lockspace *ls, SaNameT *name)
+{
+	SaCkptCheckpointHandleT h;
+	SaCkptCheckpointDescriptorT s;
+	SaAisErrorT rv;
+	int ret = 0;
+	int retries;
+
+	h = (SaCkptCheckpointHandleT) ls->lock_ckpt_handle;
+	log_group(ls, "unlink ckpt %llx", (unsigned long long)h);
+
+	retries = 0;
+ unlink_retry:
+	rv = saCkptCheckpointUnlink(global_ckpt_h, name);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "unlink ckpt retry");
+		sleep(1);
+		if (retries++ < 10)
+			goto unlink_retry;
+	}
+	if (rv == SA_AIS_OK)
+		goto out_close;
+	if (!h)
+		goto out;
+
+	log_error("unlink ckpt error %d %s", rv, ls->name);
+	ret = -1;
+
+	retries = 0;
+ status_retry:
+	rv = saCkptCheckpointStatusGet(h, &s);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "unlink ckpt status retry");
+		sleep(1);
+		if (retries++ < 10)
+			goto status_retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("unlink ckpt status error %d %s", rv, ls->name);
+		goto out_close;
+	}
+
+	log_group(ls, "unlink ckpt status: size %llu, max sections %u, "
+		      "max section size %llu, section count %u, mem %u",
+		 (unsigned long long)s.checkpointCreationAttributes.checkpointSize,
+		 s.checkpointCreationAttributes.maxSections,
+		 (unsigned long long)s.checkpointCreationAttributes.maxSectionSize,
+		 s.numberOfSections, s.memoryUsed);
+
+ out_close:
+	retries = 0;
+ close_retry:
+	rv = saCkptCheckpointClose(h);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "unlink ckpt close retry");
+		sleep(1);
+		if (retries++ < 10)
+			goto close_retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("unlink ckpt %llx close err %d %s",
+			  (unsigned long long)h, rv, ls->name);
+	}
+ out:
+	ls->lock_ckpt_handle = 0;
+	return ret;
+}
+
+static int unlink_checkpoint(struct lockspace *ls)
+{
+	SaNameT name;
+	int len;
+
+	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
+		       ls->name, our_nodeid);
+	name.length = len;
+
+	return _unlink_checkpoint(ls, &name);
+}
+
+static void read_checkpoint(struct lockspace *ls, int nodeid)
+{
+	SaCkptCheckpointHandleT h;
+	SaCkptSectionIterationHandleT itr;
+	SaCkptSectionDescriptorT desc;
+	SaCkptIOVectorElementT iov;
+	SaNameT name;
+	SaAisErrorT rv;
+	char buf[DLM_RESNAME_MAXLEN];
+	int len;
+	int retries;
+
+	log_group(ls, "read_checkpoint %d", nodeid);
+
+	if (nodeid == our_nodeid)
+		return;
+
+	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
+		       ls->name, nodeid);
+	name.length = len;
+
+	retries = 0;
+ open_retry:
+	rv = saCkptCheckpointOpen(global_ckpt_h, &name, NULL,
+				  SA_CKPT_CHECKPOINT_READ, 0, &h);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "read_checkpoint: %d ckpt open retry", nodeid);
+		sleep(1);
+		if (retries++ < 10)
+			goto open_retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("read_checkpoint: %d ckpt open error %d", nodeid, rv);
+		return;
+	}
+
+	retries = 0;
+ init_retry:
+	rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "read_checkpoint: ckpt iterinit retry");
+		sleep(1);
+		if (retries++ < 10)
+			goto init_retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("read_checkpoint: %d ckpt iterinit error %d", nodeid, rv);
+		goto out;
+	}
+
+	while (1) {
+		retries = 0;
+	 next_retry:
+		rv = saCkptSectionIterationNext(itr, &desc);
+		if (rv == SA_AIS_ERR_NO_SECTIONS)
+			break;
+		if (rv == SA_AIS_ERR_TRY_AGAIN) {
+			log_group(ls, "read_checkpoint: ckpt iternext retry");
+			sleep(1);
+			if (retries++ < 10)
+				goto next_retry;
+		}
+		if (rv != SA_AIS_OK) {
+			log_error("read_checkpoint: %d ckpt iternext error %d",
+				  nodeid, rv);
+			goto out_it;
+		}
+
+		if (!desc.sectionSize)
+			continue;
+
+		iov.sectionId = desc.sectionId;
+		iov.dataBuffer = &section_buf;
+		iov.dataSize = desc.sectionSize;
+		iov.dataOffset = 0;
+
+		memset(&buf, 0, sizeof(buf));
+		snprintf(buf, sizeof(buf), "%s", desc.sectionId.id);
+
+		log_group(ls, "read_checkpoint: section size %llu id %u \"%s\"",
+			  (unsigned long long)iov.dataSize,
+			  iov.sectionId.idLen, buf);
+
+		retries = 0;
+	 read_retry:
+		rv = saCkptCheckpointRead(h, &iov, 1, NULL);
+		if (rv == SA_AIS_ERR_TRY_AGAIN) {
+			log_group(ls, "read_checkpoint: ckpt read retry");
+			sleep(1);
+			if (retries++ < 10)
+				goto read_retry;
+		}
+		if (rv != SA_AIS_OK) {
+			log_error("read_checkpoint: %d ckpt read error %d",
+				  nodeid, rv);
+			goto out_it;
+		}
+
+		log_group(ls, "read_checkpoint: ckpt read %llu bytes",
+			  (unsigned long long)iov.readSize);
+		section_len = iov.readSize;
+
+		if (!section_len)
+		       continue;
+
+		if (section_len % sizeof(struct pack_lock)) {
+			log_error("read_checkpoint: %d bad section len %d",
+				  nodeid, section_len);
+			continue;
+		}
+
+		unpack_section_buf(ls, nodeid, (char *)desc.sectionId.id,
+				   desc.sectionId.idLen);
+	}
+
+ out_it:
+	saCkptSectionIterationFinalize(itr);
+	retries = 0;
+ out:
+	rv = saCkptCheckpointClose(h);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "read_checkpoint: unlink ckpt close retry");
+		sleep(1);
+		if (retries++ < 10)
+			goto out;
+	}
+	if (rv != SA_AIS_OK)
+		log_error("read_checkpoint: %d close error %d", nodeid, rv);
+}
+
+static void write_checkpoint(struct lockspace *ls)
+{
+	SaCkptCheckpointCreationAttributesT attr;
+	SaCkptCheckpointHandleT h;
+	SaCkptSectionIdT section_id;
+	SaCkptSectionCreationAttributesT section_attr;
+	SaCkptCheckpointOpenFlagsT flags;
+	SaNameT name;
+	SaAisErrorT rv;
+	char buf[DLM_RESNAME_MAXLEN];
+	struct dlm_rsb *r;
+	struct dlm_lkb *lkb;
+	int r_count, lock_count, total_size, section_size, max_section_size;
+	int len;
+
+	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
+		      ls->name, our_nodeid);
+	name.length = len;
+
+	/* unlink an old checkpoint before we create a new one */
+	if (ls->lock_ckpt_handle) {
+		if (_unlink_checkpoint(ls, &name))
+			return;
+	}
+
+	/* loop through all locks to figure out sizes to set in
+	   the attr fields */
+
+	r_count = 0;
+	lock_count = 0;
+	total_size = 0;
+	max_section_size = 0;
+
+	list_for_each_entry(r, &ls->resources, list) {
+		r_count++;
+		section_size = 0;
+		list_for_each_entry(lkb, &r->locks, list) {
+			section_size += sizeof(struct pack_lock);
+			lock_count++;
+		}
+		total_size += section_size;
+		if (section_size > max_section_size)
+			max_section_size = section_size;
+	}
+
+	log_group(ls, "write_checkpoint: r_count %d, lock_count %d",
+		  r_count, lock_count);
+
+	log_group(ls, "write_checkpoint: total %d bytes, max_section %d bytes",
+		  total_size, max_section_size);
+
+	attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
+	attr.checkpointSize = total_size;
+	attr.retentionDuration = SA_TIME_MAX;
+	attr.maxSections = r_count + 1;      /* don't know why we need +1 */
+	attr.maxSectionSize = max_section_size;
+	attr.maxSectionIdSize = DLM_RESNAME_MAXLEN;
+
+	flags = SA_CKPT_CHECKPOINT_READ |
+		SA_CKPT_CHECKPOINT_WRITE |
+		SA_CKPT_CHECKPOINT_CREATE;
+
+ open_retry:
+	rv = saCkptCheckpointOpen(global_ckpt_h, &name, &attr, flags, 0, &h);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(ls, "write_checkpoint: ckpt open retry");
+		sleep(1);
+		goto open_retry;
+	}
+	if (rv == SA_AIS_ERR_EXIST) {
+		log_group(ls, "write_checkpoint: ckpt already exists");
+		return;
+	}
+	if (rv != SA_AIS_OK) {
+		log_group(ls, "write_checkpoint: ckpt open error %d", rv);
+		return;
+	}
+
+	log_group(ls, "write_checkpoint: open ckpt handle %llx", (long long)h);
+	ls->lock_ckpt_handle = (uint64_t) h;
+
+	list_for_each_entry(r, &ls->resources, list) {
+		memset(buf, 0, sizeof(buf));
+		len = snprintf(buf, sizeof(buf), "%s", r->name);
+
+		section_id.id = (void *)buf;
+		section_id.idLen = len + 1;
+		section_attr.sectionId = &section_id;
+		section_attr.expirationTime = SA_TIME_END;
+
+		pack_section_buf(ls, r);
+
+		log_group(ls, "write_checkpoint: section size %u id %u \"%s\"",
+			  section_len, section_id.idLen, buf);
+
+	 create_retry:
+		rv = saCkptSectionCreate(h, &section_attr, &section_buf,
+					 section_len);
+		if (rv == SA_AIS_ERR_TRY_AGAIN) {
+			log_group(ls, "write_checkpoint: ckpt create retry");
+			sleep(1);
+			goto create_retry;
+		}
+		if (rv == SA_AIS_ERR_EXIST) {
+			/* this shouldn't happen in general */
+			log_error("write_checkpoint: clearing old ckpt");
+			saCkptCheckpointClose(h);
+			_unlink_checkpoint(ls, &name);
+			goto open_retry;
+		}
+		if (rv != SA_AIS_OK) {
+			log_error("write_checkpoint: section create %d", rv);
+			break;
+		}
+	}
+}
+
+static int _send_message(cpg_handle_t h, void *buf, int len, int type)
+{
+	struct iovec iov;
+	cpg_error_t error;
+	int retries = 0;
+
+	iov.iov_base = buf;
+	iov.iov_len = len;
+ retry:
+	error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		retries++;
+		usleep(1000);
+		if (!(retries % 100))
+			log_error("cpg_mcast_joined retry %d", retries);
+		if (retries < 1000)
+			goto retry;
+	}
+	if (error != CPG_OK) {
+		log_error("cpg_mcast_joined error %d handle %llx",
+			  (int)error, (long long)h);
+		disable_deadlock();
+		return -1;
+	}
+	return 0;
+}
+
+static void send_message(struct lockspace *ls, int type)
+{
+	struct dlm_header *hd;
+	int len;
+	char *buf;
+
+	len = sizeof(struct dlm_header);
+	buf = malloc(len);
+	if (!buf) {
+		log_error("send_message: no memory");
+		disable_deadlock();
+		return;
+	}
+	memset(buf, 0, len);
+
+	hd = (struct dlm_header *)buf;
+	hd->version[0]  = cpu_to_le16(DLM_HEADER_MAJOR);
+	hd->version[1]  = cpu_to_le16(DLM_HEADER_MINOR);
+	hd->version[2]  = cpu_to_le16(DLM_HEADER_PATCH);
+	hd->type	= cpu_to_le16(type);
+	hd->nodeid      = cpu_to_le32(our_nodeid);
+	hd->to_nodeid   = 0;
+	hd->global_id   = cpu_to_le32(ls->global_id);
+	memcpy(hd->name, ls->name, strlen(ls->name));
+
+	_send_message(ls->cpg_h, buf, len, type);
+
+	free(buf);
+}
+
+static void send_checkpoint_ready(struct lockspace *ls)
+{
+	log_group(ls, "send_checkpoint_ready");
+	send_message(ls, DLM_MSG_CHECKPOINT_READY);
+}
+
+void send_cycle_start(struct lockspace *ls)
+{
+	if (!deadlock_enabled)
+		return;
+	log_group(ls, "send_cycle_start");
+	send_message(ls, DLM_MSG_CYCLE_START);
+}
+
+/* FIXME: where to send this?  we want to do the cancel on the node
+   where the transaction lives, which isn't always the master node that
+   sent us the info.  look at lkb->from and lkb->lock.nodeid, use
+   remid if sending to a process copy node */
+
+static void send_cancel_lock(struct lockspace *ls, struct trans *tr,
+			     struct dlm_lkb *lkb)
+{
+	struct dlm_header *hd;
+	int len;
+	char *buf;
+
+	log_group(ls, "send_cancel_lock");
+
+	len = sizeof(struct dlm_header);
+	buf = malloc(len);
+	if (!buf) {
+		log_error("send_message: no memory");
+		disable_deadlock();
+		return;
+	}
+	memset(buf, 0, len);
+
+	hd = (struct dlm_header *)buf;
+	hd->version[0]  = cpu_to_le16(DLM_HEADER_MAJOR);
+	hd->version[1]  = cpu_to_le16(DLM_HEADER_MINOR);
+	hd->version[2]  = cpu_to_le16(DLM_HEADER_PATCH);
+	hd->type	= cpu_to_le16(DLM_MSG_CANCEL_LOCK);
+	hd->nodeid      = cpu_to_le32(our_nodeid);
+	hd->to_nodeid   = 0;
+	hd->global_id   = cpu_to_le32(ls->global_id);
+	memcpy(hd->name, ls->name, strlen(ls->name));
+
+	_send_message(ls->cpg_h, buf, len, DLM_MSG_CANCEL_LOCK);
+
+	free(buf);
+}
+
+static void find_deadlock(struct lockspace *ls);
+
+static void receive_checkpoint_ready(struct lockspace *ls, int nodeid)
+{
+	struct node *node;
+	int not_ready = 0;
+
+	log_group(ls, "receive_checkpoint_ready %d", nodeid);
+
+	read_checkpoint(ls, nodeid);
+
+	/* when locks are read from all nodes, then search_deadlock()
+	   to do detection */
+
+	list_for_each_entry(node, &ls->nodes, list) {
+		if (node->nodeid == nodeid)
+			node->checkpoint_ready = 1;
+		if (!node->checkpoint_ready)
+			not_ready++;
+	}
+
+	if (not_ready) {
+		log_group(ls, "not_ready %d", not_ready);
+		return;
+	}
+
+	find_deadlock(ls);
+}
+
+static void receive_cycle_start(struct lockspace *ls, int nodeid)
+{
+	int rv;
+
+	log_group(ls, "receive_cycle_start %d", nodeid);
+
+	gettimeofday(&ls->last_deadlock_check, NULL);
+
+	rv = read_debugfs_locks(ls);
+	if (rv < 0) {
+		/* compat for RHEL5.1 kernels */
+		rv = read_debugfs_master(ls);
+		if (rv < 0) {
+			log_error("can't read dlm debugfs file: %s",
+				  strerror(errno));
+			return;
+		}
+	}
+
+	write_checkpoint(ls);
+	send_checkpoint_ready(ls);
+}
+
+static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		uint32_t nodeid, uint32_t pid, void *data, int data_len)
+{
+	struct lockspace *ls;
+	struct dlm_header *hd;
+
+	ls = find_ls_by_handle(handle);
+	if (!ls)
+		return;
+
+	hd = (struct dlm_header *) data;
+
+	hd->version[0]  = le16_to_cpu(hd->version[0]);
+	hd->version[1]  = le16_to_cpu(hd->version[1]);
+	hd->version[2]  = le16_to_cpu(hd->version[2]);
+	hd->type	= le16_to_cpu(hd->type);
+	hd->nodeid      = le32_to_cpu(hd->nodeid);
+	hd->to_nodeid   = le32_to_cpu(hd->to_nodeid);
+	hd->global_id   = le32_to_cpu(hd->global_id);
+
+	if (hd->version[0] != DLM_HEADER_MAJOR) {
+		log_error("reject message version %u.%u.%u",
+			  hd->version[0], hd->version[1], hd->version[2]);
+		return;
+	}
+
+	switch (hd->type) {
+	case DLM_MSG_CYCLE_START:
+		receive_cycle_start(ls, hd->nodeid);
+		break;
+	case DLM_MSG_CHECKPOINT_READY:
+		receive_checkpoint_ready(ls, hd->nodeid);
+		break;
+	default:
+		log_error("unknown message type %d from %d",
+			  hd->type, hd->nodeid);
+	}
+}
+
+static void node_joined(struct lockspace *ls, int nodeid)
+{
+	struct node *node;
+
+	node = malloc(sizeof(struct node));
+	if (!node) {
+		log_error("node_joined: no memory");
+		disable_deadlock();
+		return;
+	}
+	memset(node, 0, sizeof(struct node));
+	node->nodeid = nodeid;
+	list_add_tail(&node->list, &ls->nodes);
+	log_group(ls, "node %d joined deadlock cpg", nodeid);
+}
+
+static void node_left(struct lockspace *ls, int nodeid, int reason)
+{
+	struct node *node, *safe;
+
+	list_for_each_entry_safe(node, safe, &ls->nodes, list) {
+		if (node->nodeid != nodeid)
+			continue;
+
+		/* TODO: purge locks from this node if we're in a cycle */
+
+		list_del(&node->list);
+		free(node);
+		log_group(ls, "node %d left deadlock cpg", nodeid);
+	}
+}
+
+static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		struct cpg_address *member_list, int member_list_entries,
+		struct cpg_address *left_list, int left_list_entries,
+		struct cpg_address *joined_list, int joined_list_entries)
+{
+	struct lockspace *ls;
+	int i;
+
+	ls = find_ls_by_handle(handle);
+	if (!ls)
+		return;
+
+	for (i = 0; i < joined_list_entries; i++)
+		node_joined(ls, joined_list[i].nodeid);
+
+	for (i = 0; i < left_list_entries; i++)
+		node_left(ls, left_list[i].nodeid, left_list[i].reason);
+}
+
+static void process_deadlock_cpg(int ci)
+{
+	struct lockspace *ls;
+	cpg_error_t error;
+
+	ls = get_client_lockspace(ci);
+	if (!ls)
+		return;
+
+	error = cpg_dispatch(ls->cpg_h, CPG_DISPATCH_ONE);
+	if (error != CPG_OK)
+		log_error("cpg_dispatch error %d", error);
+}
+
+cpg_callbacks_t ls_callbacks = {
+	.cpg_deliver_fn = deliver_cb,
+	.cpg_confchg_fn = confchg_cb,
+};
+
+static void make_cpgname(struct lockspace *ls, struct cpg_name *cn)
+{
+	char name[MAXNAME+8];
+
+	memset(name, 0, sizeof(name));
+	strncpy(name, ls->name, sizeof(name));
+	strncat(name, "_deadlk", 7);
+	memset(cn, 0, sizeof(struct cpg_name));
+	strncpy(cn->value, name, strlen(name) + 1);
+	cn->length = strlen(name) + 1;
+}
+
+void join_deadlock_cpg(struct lockspace *ls)
+{
+	cpg_handle_t h;
+	struct cpg_name cpgname;
+	cpg_error_t error;
+	int retries = 0;
+	int fd, ci;
+
+	if (!deadlock_enabled)
+		return;
+
+	unlink_checkpoint(ls); /* not sure about this */
+
+	error = cpg_initialize(&h, &ls_callbacks);
+	if (error != CPG_OK) {
+		log_error("cpg_initialize error %d", error);
+		return;
+	}
+
+	cpg_fd_get(h, &fd);
+	if (fd < 0) {
+		log_error("cpg_fd_get error %d", error);
+		return;
+	}
+
+	ci = client_add(fd, process_deadlock_cpg, NULL);
+
+	make_cpgname(ls, &cpgname);
+
+ retry:
+	error = cpg_join(h, &cpgname);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		sleep(1);
+		if (retries++ < 10)
+			goto retry;
+	}
+	if (error != CPG_OK) {
+		log_error("deadlk cpg join error %d", error);
+		goto fail;
+	}
+
+	ls->cpg_h = h;
+	ls->cpg_ci = ci;
+	set_client_lockspace(ci, ls);
+	log_group(ls, "deadlk cpg ci %d fd %d", ci, fd);
+	return;
+ fail:
+	cpg_finalize(h);
+	client_dead(ci);
+}
+
+void leave_deadlock_cpg(struct lockspace *ls)
+{
+	struct cpg_name cpgname;
+	cpg_error_t error;
+	int retries = 0;
+
+	if (!deadlock_enabled)
+		return;
+
+	make_cpgname(ls, &cpgname);
+ retry:
+	error = cpg_leave(ls->cpg_h, &cpgname);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		sleep(1);
+		if (retries++ < 10)
+			goto retry;
+	}
+	if (error != CPG_OK)
+		log_error("deadlk cpg leave error %d", error);
+
+	cpg_finalize(ls->cpg_h);
+	client_dead(ls->cpg_ci);
+}
+
+static void add_lkb_trans(struct trans *tr, struct dlm_lkb *lkb)
+{
+	list_add(&lkb->trans_list, &tr->locks);
+	lkb->trans = tr;
+}
+
+static struct trans *get_trans(struct lockspace *ls, uint64_t xid)
+{
+	struct trans *tr;
+
+	list_for_each_entry(tr, &ls->transactions, list) {
+		if (tr->xid == xid)
+			return tr;
+	}
+
+	tr = malloc(sizeof(struct trans));
+	if (!tr) {
+		log_error("get_trans: no memory");
+		disable_deadlock();
+		return NULL;
+	}
+	memset(tr, 0, sizeof(struct trans));
+	tr->xid = xid;
+	tr->waitfor = NULL;
+	tr->waitfor_alloc = 0;
+	tr->waitfor_count = 0;
+	INIT_LIST_HEAD(&tr->locks);
+	list_add(&tr->list, &ls->transactions);
+	return tr;
+}
+
+/* for each rsb, for each lock, find/create trans, add lkb to the trans list */
+
+static void create_trans_list(struct lockspace *ls)
+{
+	struct dlm_rsb *r;
+	struct dlm_lkb *lkb;
+	struct trans *tr;
+	int r_count = 0, lkb_count = 0;
+
+	list_for_each_entry(r, &ls->resources, list) {
+		r_count++;
+		list_for_each_entry(lkb, &r->locks, list) {
+			lkb_count++;
+			tr = get_trans(ls, lkb->lock.xid);
+			if (!tr)
+				goto out;
+			add_lkb_trans(tr, lkb);
+		}
+	}
+ out:
+	log_group(ls, "create_trans_list: r_count %d lkb_count %d",
+		  r_count, lkb_count);
+}
+
+static int locks_compat(struct dlm_lkb *waiting_lkb,
+			struct dlm_lkb *granted_lkb)
+{
+	if (waiting_lkb == granted_lkb) {
+		log_debug("waiting and granted same lock");
+		return 0;
+	}
+
+	if (waiting_lkb->trans->xid == granted_lkb->trans->xid) {
+		log_debug("waiting and granted same trans %llx",
+			  (long long)waiting_lkb->trans->xid);
+		return 0;
+	}
+
+	return dlm_modes_compat(granted_lkb->lock.grmode,
+				waiting_lkb->lock.rqmode);
+}
+
+/* TODO: don't add new waitfor trans if we're already waiting for the same
+   trans for another lock */
+
+static void add_waitfor(struct dlm_lkb *waiting_lkb,
+			struct dlm_lkb *granted_lkb)
+{
+	struct trans *tr;
+	int old_alloc, i;
+
+	if (locks_compat(waiting_lkb, granted_lkb))
+		return;
+
+	tr = waiting_lkb->trans;
+
+	if (tr->waitfor_count == tr->waitfor_alloc) {
+		old_alloc = tr->waitfor_alloc;
+		tr->waitfor_alloc += TR_NALLOC;
+		tr->waitfor = realloc(tr->waitfor,
+				      tr->waitfor_alloc * sizeof(tr));
+		for (i = old_alloc; i < tr->waitfor_alloc; i++)
+			tr->waitfor[i] = NULL;
+	}
+
+	tr->waitfor[tr->waitfor_count++] = granted_lkb->trans;
+	granted_lkb->trans->others_waiting_on_us++;
+}
+
+/* for each trans, for each waiting lock, go to rsb of the lock,
+   find granted locks on that rsb, then find the trans the
+   granted lock belongs to, add that trans to our waitfor list */
+
+static void create_waitfor_graph(struct lockspace *ls)
+{
+	struct dlm_lkb *waiting_lkb, *granted_lkb;
+	struct dlm_rsb *r;
+	struct trans *tr;
+
+	list_for_each_entry(tr, &ls->transactions, list) {
+		list_for_each_entry(waiting_lkb, &tr->locks, trans_list) {
+			if (waiting_lkb->lock.status == DLM_LKSTS_GRANTED)
+				continue;
+			/* waiting_lkb status is CONVERT or WAITING */
+
+			r = waiting_lkb->rsb;
+
+			list_for_each_entry(granted_lkb, &r->locks, list) {
+				if (granted_lkb->lock.status==DLM_LKSTS_WAITING)
+					continue;
+				/* granted_lkb status is GRANTED or CONVERT */
+				add_waitfor(waiting_lkb, granted_lkb);
+			}
+		}
+	}
+}
+
+/* Assume a transaction that's not waiting on any locks will complete, release
+   all the locks it currently holds, and exit.  Other transactions that were
+   blocked waiting on the removed transaction's now-released locks may now be
+   unblocked, complete, release all held locks and exit.  Repeat this until
+   no more transactions can be removed.  If there are transactions remaining,
+   then they are deadlocked. */
+
+static void remove_waitfor(struct trans *tr, struct trans *remove_tr)
+{
+	int i;
+
+	for (i = 0; i < tr->waitfor_alloc; i++) {
+		if (!tr->waitfor_count)
+			break;
+
+		if (!tr->waitfor[i])
+			continue;
+
+		if (tr->waitfor[i] == remove_tr) {
+			tr->waitfor[i] = NULL;
+			tr->waitfor_count--;
+			remove_tr->others_waiting_on_us--;
+		}
+	}
+}
+
+/* remove_tr is not waiting for anything, assume it completes and goes away
+   and remove it from any other transaction's waitfor list */
+
+static void remove_trans(struct lockspace *ls, struct trans *remove_tr)
+{
+	struct trans *tr;
+
+	list_for_each_entry(tr, &ls->transactions, list) {
+		if (tr == remove_tr)
+			continue;
+		if (!remove_tr->others_waiting_on_us)
+			break;
+		remove_waitfor(tr, remove_tr);
+	}
+
+	if (remove_tr->others_waiting_on_us)
+		log_debug("trans %llx removed others waiting %d",
+			  (unsigned long long)remove_tr->xid,
+			  remove_tr->others_waiting_on_us);
+}
+
+static int reduce_waitfor_graph(struct lockspace *ls)
+{
+	struct trans *tr, *safe;
+	int blocked = 0;
+	int removed = 0;
+
+	list_for_each_entry_safe(tr, safe, &ls->transactions, list) {
+		if (tr->waitfor_count) {
+			blocked++;
+			continue;
+		}
+		remove_trans(ls, tr);
+		list_del(&tr->list);
+		if (tr->waitfor)
+			free(tr->waitfor);
+		free(tr);
+		removed++;
+	}
+
+	if (removed)
+		log_group(ls, "reduce_waitfor_graph: %d blocked, %d removed",
+			  blocked, removed);
+	return removed;
+}
+
+static void reduce_waitfor_graph_loop(struct lockspace *ls)
+{
+	int removed;
+
+	while (1) {
+		removed = reduce_waitfor_graph(ls);
+		if (!removed)
+			break;
+	}
+}
+
+static struct trans *find_trans_to_cancel(struct lockspace *ls)
+{
+	struct trans *tr;
+
+	list_for_each_entry(tr, &ls->transactions, list) {
+		if (!tr->others_waiting_on_us)
+			continue;
+		return tr;
+	}
+	return NULL;
+}
+
+static void cancel_trans(struct lockspace *ls)
+{
+	struct trans *tr;
+	struct dlm_lkb *lkb;
+	int removed;
+
+	tr = find_trans_to_cancel(ls);
+	if (!tr) {
+		log_group(ls, "cancel_trans: no trans found");
+		return;
+	}
+
+	list_for_each_entry(lkb, &tr->locks, trans_list) {
+		if (lkb->lock.status == DLM_LKSTS_GRANTED)
+			continue;
+		send_cancel_lock(ls, tr, lkb);
+		tr->waitfor_count--;
+	}
+
+	if (tr->waitfor_count)
+		log_group(ls, "canceled trans has non-zero waitfor_count %d",
+			  tr->waitfor_count);
+
+	/* this should now remove the canceled trans */
+	removed = reduce_waitfor_graph(ls);
+
+	if (!removed)
+		log_group(ls, "canceled trans not removed from graph");
+
+	/* now call reduce_waitfor_graph() in another loop and it
+	   should completely reduce */
+}
+
+static char *status_str(int lksts)
+{
+	switch (lksts) {
+	case DLM_LKSTS_WAITING:
+		return "W";
+	case DLM_LKSTS_GRANTED:
+		return "G";
+	case DLM_LKSTS_CONVERT:
+		return "C";
+	}
+	return "?";
+}
+
+static char *mode_str(int mode)
+{
+	switch (mode) {
+	case DLM_LOCK_IV:
+		return "IV";
+	case DLM_LOCK_NL:
+		return "NL";
+	case DLM_LOCK_CR:
+		return "CR";
+	case DLM_LOCK_CW:
+		return "CW";
+	case DLM_LOCK_PR:
+		return "PR";
+	case DLM_LOCK_PW:
+		return "PW";
+	case DLM_LOCK_EX:
+		return "EX";
+	}
+	return "??";
+}
+
+static void dump_trans(struct lockspace *ls, struct trans *tr)
+{
+	struct dlm_lkb *lkb;
+	struct trans *wf;
+	int i;
+
+	log_group(ls, "trans %llx waitfor_count %d others_waiting_on_us %d",
+		  (unsigned long long)tr->xid, tr->waitfor_count,
+		  tr->others_waiting_on_us);
+
+	log_group(ls, "locks:");
+	
+	list_for_each_entry(lkb, &tr->locks, trans_list) {
+		log_group(ls, "  %s: id %08x gr %s rq %s pid %u \"%s\"",
+			  status_str(lkb->lock.status),
+			  lkb->lock.id,
+			  mode_str(lkb->lock.grmode),
+			  mode_str(lkb->lock.rqmode),
+			  lkb->lock.ownpid,
+			  lkb->rsb->name);
+	}
+
+	if (!tr->waitfor_count)
+		return;
+
+	log_group(ls, "waitfor:");
+
+	for (i = 0; i < tr->waitfor_alloc; i++) {
+		if (!tr->waitfor[i])
+			continue;
+		wf = tr->waitfor[i];
+		log_group(ls, "  xid %llx", (unsigned long long)wf->xid);
+	}
+}
+
+static void dump_all_trans(struct lockspace *ls)
+{
+	struct trans *tr;
+
+	list_for_each_entry(tr, &ls->transactions, list)
+		dump_trans(ls, tr);
+}
+
+static void find_deadlock(struct lockspace *ls)
+{
+	if (list_empty(&ls->resources)) {
+		log_group(ls, "no resources no deadlock");
+		return;
+	}
+
+	create_trans_list(ls);
+	create_waitfor_graph(ls);
+
+	log_group(ls, "created waitfor graph:");
+	dump_all_trans(ls);
+
+	reduce_waitfor_graph_loop(ls);
+
+	if (list_empty(&ls->transactions)) {
+		log_group(ls, "no deadlock");
+		goto out;
+	}
+
+	log_group(ls, "found deadlock");
+	dump_all_trans(ls);
+
+	cancel_trans(ls);
+
+	reduce_waitfor_graph_loop(ls);
+
+	if (list_empty(&ls->transactions)) {
+		log_group(ls, "deadlock resolved with cancel");
+		goto out;
+	}
+
+	log_error("deadlock resolution failed");
+	dump_all_trans(ls);
+
+ out:
+	free_resources(ls);
+}
+
/cvs/cluster/cluster/group/dlm_controld/dlm_controld.h,v  -->  standard output
revision 1.1
--- cluster/group/dlm_controld/dlm_controld.h
+++ -	2007-07-24 18:15:43.826254000 +0000
@@ -0,0 +1,20 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2007 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#ifndef __DLM_CONTROLD_DOT_H__
+#define __DLM_CONTROLD_DOT_H__
+
+#define DLM_CONTROLD_SOCK_PATH	"dlm_controld_socket"
+#define DLM_CONTROLD_MSGLEN	256
+
+#endif
+
--- cluster/group/dlm_controld/Makefile	2007/06/01 09:45:35	1.7
+++ cluster/group/dlm_controld/Makefile	2007/07/24 18:15:43	1.8
@@ -17,15 +17,16 @@
 OBJS=	main.o \
 	member_cman.o \
 	group.o \
-	action.o
+	action.o \
+	deadlock.o
 
 CFLAGS += -g 
 CFLAGS += -I${ccsincdir} -I${cmanincdir}
 CFLAGS += -idirafter ${KERNEL_SRC}/include/linux
-CFLAGS += -I../../group/lib/ -I../include/
+CFLAGS += -I../../group/lib/ -I../include/ -I../../dlm/lib/
 CFLAGS += -I${incdir}
 
-LDFLAGS += -L${ccslibdir} -L${cmanlibdir} -lccs -lcman
+LDFLAGS += -L${ccslibdir} -L${cmanlibdir} -L${openaislibdir} -lccs -lcman -lcpg -lSaCkpt
 LDFLAGS += -L../lib -lgroup
 
 all: depends ${TARGET}
--- cluster/group/dlm_controld/action.c	2007/06/06 21:12:39	1.12
+++ cluster/group/dlm_controld/action.c	2007/07/24 18:15:43	1.13
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -47,7 +47,24 @@
 #define COMMS_DIR     "/sys/kernel/config/dlm/cluster/comms"
 
 
-static int do_write(int fd, void *buf, size_t count)
+int do_read(int fd, void *buf, size_t count)
+{
+	int rv, off = 0;
+
+	while (off < count) {
+		rv = read(fd, buf + off, count - off);
+		if (rv == 0)
+			return -1;
+		if (rv == -1 && errno == EINTR)
+			continue;
+		if (rv == -1)
+			return -1;
+		off += rv;
+	}
+	return 0;
+}
+
+int do_write(int fd, void *buf, size_t count)
 {
 	int rv, off = 0;
 
--- cluster/group/dlm_controld/dlm_daemon.h	2007/05/04 21:05:28	1.9
+++ cluster/group/dlm_controld/dlm_daemon.h	2007/07/24 18:15:43	1.10
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -40,9 +40,19 @@
 #include <syslog.h>
 #include <sched.h>
 #include <signal.h>
+#include <sys/time.h>
 #include <linux/netlink.h>
+#include <linux/genetlink.h>
+#include <linux/dlm.h>
+#include <linux/dlm_netlink.h>
+
+#include <openais/saAis.h>
+#include <openais/saCkpt.h>
+#include <openais/cpg.h>
 
+#include "dlm_controld.h"
 #include "list.h"
+#include "linux_endian.h"
 #include "libgroup.h"
 
 #define MAXARGS		8
@@ -68,11 +78,27 @@
 	syslog(LOG_ERR, fmt, ##args); \
 } while (0)
 
+#define log_group(ls, fmt, args...) \
+do { \
+	snprintf(daemon_debug_buf, 255, "%ld %s " fmt "\n", time(NULL), \
+		 (ls)->name, ##args); \
+	if (daemon_debug_opt) fprintf(stderr, "%s", daemon_debug_buf); \
+} while (0)
+
 
 struct lockspace {
 	struct list_head	list;
 	char			name[MAXNAME+1];
+	uint32_t		global_id;
 	int			joining;
+	int			cpg_ci;
+	cpg_handle_t		cpg_h;
+	SaCkptCheckpointHandleT lock_ckpt_handle;
+	struct list_head	transactions;
+	struct list_head	resources;
+	struct list_head	nodes;
+	struct timeval		last_deadlock_check;
+	unsigned int		timewarn_count;
 };
 
 /* action.c */
@@ -84,17 +110,23 @@
 int set_members(char *name, int new_count, int *new_members);
 int set_id(char *name, uint32_t id);
 void set_ccs_options(void);
+int do_read(int fd, void *buf, size_t count);
+int do_write(int fd, void *buf, size_t count);
 
 /* member_xxx.c */
 int setup_member(void);
-int process_member(void);
+void process_member(int ci);
 char *nodeid2name(int nodeid);
 
 /* group.c */
 int setup_groupd(void);
-int process_groupd(void);
+void process_groupd(int ci);
 
 /* main.c */
+int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci));
+void client_dead(int ci);
+void set_client_lockspace(int ci, struct lockspace *ls);
+struct lockspace *get_client_lockspace(int ci);
 struct lockspace *create_ls(char *name);
 struct lockspace *find_ls(char *name);
 
@@ -102,5 +134,11 @@
 int is_cman_member(int nodeid);
 void cman_statechange(void);
 
+/* deadlock.c */
+void setup_deadlock(void);
+void join_deadlock_cpg(struct lockspace *ls);
+void leave_deadlock_cpg(struct lockspace *ls);
+void send_cycle_start(struct lockspace *ls);
+
 #endif
 
--- cluster/group/dlm_controld/group.c	2006/10/13 16:03:47	1.3
+++ cluster/group/dlm_controld/group.c	2007/07/24 18:15:43	1.4
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -107,7 +107,7 @@
 	return str_members_buf;
 }
 
-int process_groupd(void)
+void process_groupd(int ci)
 {
 	struct lockspace *ls;
 	int error = 0, val;
@@ -155,13 +155,15 @@
 
 		/* this causes the dlm_new_lockspace() call (typically from
 		   mount) to complete */
-
 		set_event_done(cb_name, 0);
+
+		join_deadlock_cpg(ls);
 		break;
 
 	case DO_SETID:
 		log_debug("groupd callback: set_id %s %x", cb_name, cb_id);
 		set_id(cb_name, cb_id);
+		ls->global_id = cb_id;
 		break;
 
 	case DO_TERMINATE:
@@ -180,6 +182,7 @@
 		}
 
 		set_event_done(cb_name, val);
+		leave_deadlock_cpg(ls);
 		list_del(&ls->list);
 		free(ls);
 		break;
@@ -194,7 +197,7 @@
 
 	cb_action = 0;
  out:
-	return error;
+	return;
 }
 
 int setup_groupd(void)
@@ -203,7 +206,7 @@
 
 	gh = group_init(NULL, "dlm", 1, &callbacks, GROUPD_TIMEOUT);
 	if (!gh) {
-		log_error("group_init error %d %d", (int) gh, errno);
+		log_error("group_init error %p %d", gh, errno);
 		return -ENOTCONN;
 	}
 
--- cluster/group/dlm_controld/main.c	2007/05/04 21:05:28	1.11
+++ cluster/group/dlm_controld/main.c	2007/07/24 18:15:43	1.12
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -12,21 +12,107 @@
 
 #include "dlm_daemon.h"
 
-#define OPTION_STRING			"KDhV"
+#define OPTION_STRING			"KDhVd:"
 #define LOCKFILE_NAME			"/var/run/dlm_controld.pid"
 
-static int uevent_fd;
-static int groupd_fd;
-static int member_fd;
+#define DEADLOCK_CHECK_SECS		10
+
+#define NALLOC 16
 
 struct list_head lockspaces;
 
 extern group_handle_t gh;
+extern deadlock_enabled = 0;
+
+static int daemon_quit;
+static int client_maxi;
+static int client_size = 0;
+static struct client *client = NULL;
+static struct pollfd *pollfd = NULL;
+
+struct client {
+	int fd;
+	void *workfn;
+	void *deadfn;
+	struct lockspace *ls;
+};
+
+static void client_alloc(void)
+{
+	int i;
+
+	if (!client) {
+		client = malloc(NALLOC * sizeof(struct client));
+		pollfd = malloc(NALLOC * sizeof(struct pollfd));
+	} else {
+		client = realloc(client, (client_size + NALLOC) *
+					 sizeof(struct client));
+		pollfd = realloc(pollfd, (client_size + NALLOC) *
+					 sizeof(struct pollfd));
+		if (!pollfd)
+			log_error("can't alloc for pollfd");
+	}
+	if (!client || !pollfd)
+		log_error("can't alloc for client array");
+
+	for (i = client_size; i < client_size + NALLOC; i++) {
+		client[i].workfn = NULL;
+		client[i].deadfn = NULL;
+		client[i].fd = -1;
+		pollfd[i].fd = -1;
+		pollfd[i].revents = 0;
+	}
+	client_size += NALLOC;
+}
+
+void client_dead(int ci)
+{
+	close(client[ci].fd);
+	client[ci].workfn = NULL;
+	client[ci].fd = -1;
+	pollfd[ci].fd = -1;
+}
+
+int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
+{
+	int i;
+
+	if (!client)
+		client_alloc();
+ again:
+	for (i = 0; i < client_size; i++) {
+		if (client[i].fd == -1) {
+			client[i].workfn = workfn;
+			if (deadfn)
+				client[i].deadfn = deadfn;
+			else
+				client[i].deadfn = client_dead;
+			client[i].fd = fd;
+			pollfd[i].fd = fd;
+			pollfd[i].events = POLLIN;
+			if (i > client_maxi)
+				client_maxi = i;
+			return i;
+		}
+	}
+
+	client_alloc();
+	goto again;
+}
+
+void set_client_lockspace(int ci, struct lockspace *ls)
+{
+	client[ci].ls = ls;
+}
+
+struct lockspace *get_client_lockspace(int ci)
+{
+	return client[ci].ls;
+}
 
 static void sigterm_handler(int sig)
 {
-	if (list_empty(&lockspaces))
-		clear_configfs();
+	daemon_quit = 1;
 }
 
 struct lockspace *create_ls(char *name)
@@ -38,6 +124,9 @@
 		goto out;
 	memset(ls, 0, sizeof(*ls));
 	strncpy(ls->name, name, MAXNAME);
+	INIT_LIST_HEAD(&ls->transactions);
+	INIT_LIST_HEAD(&ls->resources);
+	INIT_LIST_HEAD(&ls->nodes);
  out:
 	return ls;
 }
@@ -54,25 +143,16 @@
 	return NULL;
 }
 
-#if 0
-void make_args(char *buf, int *argc, char **argv, char sep)
+struct lockspace *find_ls_id(uint32_t id)
 {
-	char *p = buf;
-	int i;
-
-	argv[0] = p;
+	struct lockspace *ls;
 
-	for (i = 1; i < MAXARGS; i++) {
-		p = strchr(buf, sep);
-		if (!p)
-			break;
-		*p = '\0';
-		argv[i] = p + 1;
-		buf = p + 1;
+	list_for_each_entry(ls, &lockspaces, list) {
+		if (ls->global_id == id)
+			return ls;
 	}
-	*argc = i;
+	return NULL;
 }
-#endif
 
 static char *get_args(char *buf, int *argc, char **argv, char sep, int want)
 {
@@ -108,7 +188,7 @@
 /* recv "online" (join) and "offline" (leave) 
    messages from dlm via uevents and pass them on to groupd */
 
-int process_uevent(void)
+static void process_uevent(int ci)
 {
 	struct lockspace *ls;
 	char buf[MAXLINE];
@@ -119,18 +199,18 @@
 	memset(argv, 0, sizeof(char *) * MAXARGS);
 
  retry_recv:
-	rv = recv(uevent_fd, &buf, sizeof(buf), 0);
+	rv = recv(client[ci].fd, &buf, sizeof(buf), 0);
 	if (rv == -1 && rv == EINTR)
 		goto retry_recv;
 	if (rv == -1 && rv == EAGAIN)
-		return 0;
+		return;
 	if (rv < 0) {
 		log_error("uevent recv error %d errno %d", rv, errno);
 		goto out;
 	}
 
 	if (!strstr(buf, "dlm"))
-		return 0;
+		return;
 
 	log_debug("uevent: %s", buf);
 
@@ -141,7 +221,7 @@
 	sys = argv[2];
 
 	if ((strlen(sys) != strlen("dlm")) || strcmp(sys, "dlm"))
-		return 0;
+		return;
 
 	log_debug("kernel: %s %s", act, argv[3]);
 
@@ -177,17 +257,16 @@
 	if (rv < 0)
 		log_error("process_uevent %s error %d errno %d",
 			  act, rv, errno);
-	return rv;
 }
 
-int setup_uevent(void)
+static int setup_uevent(void)
 {
 	struct sockaddr_nl snl;
 	int s, rv;
 
 	s = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_KOBJECT_UEVENT);
 	if (s < 0) {
-		log_error("netlink socket");
+		log_error("uevent netlink socket");
 		return s;
 	}
 
@@ -206,67 +285,335 @@
 	return s;
 }
 
-int loop(void)
+/* FIXME: look into using libnl/libnetlink */
+
+#define GENLMSG_DATA(glh)       ((void *)(NLMSG_DATA(glh) + GENL_HDRLEN))
+#define GENLMSG_PAYLOAD(glh)    (NLMSG_PAYLOAD(glh, 0) - GENL_HDRLEN)
+#define NLA_DATA(na)	    	((void *)((char*)(na) + NLA_HDRLEN))
+#define NLA_PAYLOAD(len)	(len - NLA_HDRLEN)
+
+/* Maximum size of response requested or message sent */
+#define MAX_MSG_SIZE    1024
+
+struct msgtemplate {
+	struct nlmsghdr n;
+	struct genlmsghdr g;
+	char buf[MAX_MSG_SIZE];
+};
+
+static int send_genetlink_cmd(int sd, uint16_t nlmsg_type, uint32_t nlmsg_pid,
+			      uint8_t genl_cmd, uint16_t nla_type,
+			      void *nla_data, int nla_len)
+{
+	struct nlattr *na;
+	struct sockaddr_nl nladdr;
+	int r, buflen;
+	char *buf;
+
+	struct msgtemplate msg;
+
+	msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN);
+	msg.n.nlmsg_type = nlmsg_type;
+	msg.n.nlmsg_flags = NLM_F_REQUEST;
+	msg.n.nlmsg_seq = 0;
+	msg.n.nlmsg_pid = nlmsg_pid;
+	msg.g.cmd = genl_cmd;
+	msg.g.version = 0x1;
+	na = (struct nlattr *) GENLMSG_DATA(&msg);
+	na->nla_type = nla_type;
+	na->nla_len = nla_len + 1 + NLA_HDRLEN;
+	if (nla_data)
+		memcpy(NLA_DATA(na), nla_data, nla_len);
+	msg.n.nlmsg_len += NLMSG_ALIGN(na->nla_len);
+
+	buf = (char *) &msg;
+	buflen = msg.n.nlmsg_len ;
+	memset(&nladdr, 0, sizeof(nladdr));
+	nladdr.nl_family = AF_NETLINK;
+	while ((r = sendto(sd, buf, buflen, 0, (struct sockaddr *) &nladdr,
+			   sizeof(nladdr))) < buflen) {
+		if (r > 0) {
+			buf += r;
+			buflen -= r;
+		} else if (errno != EAGAIN)
+			return -1;
+	}
+	return 0;
+}
+
+/*
+ * Probe the controller in genetlink to find the family id
+ * for the DLM family
+ */
+static int get_family_id(int sd)
+{
+	char genl_name[100];
+	struct {
+		struct nlmsghdr n;
+		struct genlmsghdr g;
+		char buf[256];
+	} ans;
+
+	int id, rc;
+	struct nlattr *na;
+	int rep_len;
+
+	strcpy(genl_name, DLM_GENL_NAME);
+	rc = send_genetlink_cmd(sd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY,
+				CTRL_ATTR_FAMILY_NAME, (void *)genl_name,
+				strlen(DLM_GENL_NAME)+1);
+
+	rep_len = recv(sd, &ans, sizeof(ans), 0);
+	if (ans.n.nlmsg_type == NLMSG_ERROR ||
+	    (rep_len < 0) || !NLMSG_OK((&ans.n), rep_len))
+		return 0;
+
+	na = (struct nlattr *) GENLMSG_DATA(&ans);
+	na = (struct nlattr *) ((char *) na + NLA_ALIGN(na->nla_len));
+	if (na->nla_type == CTRL_ATTR_FAMILY_ID) {
+		id = *(uint16_t *) NLA_DATA(na);
+	}
+	return id;
+}
+
+/* genetlink messages are timewarnings used as part of deadlock detection */
+
+static int setup_netlink(void)
 {
-	struct pollfd *pollfd;
-	int rv, i, maxi;
+	struct sockaddr_nl snl;
+	int s, rv;
+	uint16_t id;
+
+	s = socket(AF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
+	if (s < 0) {
+		log_error("generic netlink socket");
+		return s;
+	}
+
+	memset(&snl, 0, sizeof(snl));
+	snl.nl_family = AF_NETLINK;
 
-	pollfd = malloc(MAXCON * sizeof(struct pollfd));
-	if (!pollfd)
+	rv = bind(s, (struct sockaddr *) &snl, sizeof(snl));
+	if (rv < 0) {
+		log_error("gen netlink bind error %d errno %d", rv, errno);
+		close(s);
+		return rv;
+	}
+
+	id = get_family_id(s);
+	if (!id) {
+		log_error("Error getting family id, errno %d", errno);
+		close(s);
 		return -1;
+	}
+
+	rv = send_genetlink_cmd(s, id, getpid(), DLM_CMD_HELLO, 0, NULL, 0);
+	if (rv < 0) {
+		log_error("error sending hello cmd, errno %d", errno);
+		close(s);
+		return -1;
+	}
 
-	rv = groupd_fd = setup_groupd();
+	return s;
+}
+
+static void process_timewarn(struct dlm_lock_data *data)
+{
+	struct lockspace *ls;
+	struct timeval now;
+
+	ls = find_ls_id(data->lockspace_id);
+	if (!ls)
+		return;
+
+	log_group(ls, "timewarn: lkid %x pid %d count %d",
+		  data->id, data->ownpid, ls->timewarn_count);
+
+	gettimeofday(&now, NULL);
+
+	if (now.tv_sec - ls->last_deadlock_check.tv_sec > DEADLOCK_CHECK_SECS) {
+		ls->timewarn_count = 0;
+		send_cycle_start(ls);
+	} else {
+		/* TODO: set a poll timeout and start another cycle after
+		   DEADLOCK_CHECK_SECS.  Want to save a record of all the
+		   warned locks to see if they're still blocked later before
+		   starting a cycle?  This would only be helpful if we
+		   experienced regular false-warnings, indicating that the
+		   timewarn setting should be larger. */
+		ls->timewarn_count++;
+	}
+}
+
+static void process_netlink(int ci)
+{
+	struct msgtemplate msg;
+	struct nlattr *na;
+	int len;
+
+	len = recv(client[ci].fd, &msg, sizeof(msg), 0);
+
+	if (len < 0) {
+		log_error("nonfatal netlink error: errno %d", errno);
+		return;
+	}
+
+	if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), len)) {
+		struct nlmsgerr *err = NLMSG_DATA(&msg);
+		log_error("fatal netlink error: errno %d", err->error);
+		return;
+	}
+
+	na = (struct nlattr *) GENLMSG_DATA(&msg);
+
+	process_timewarn((struct dlm_lock_data *) NLA_DATA(na));
+}
+
+static void process_connection(int ci)
+{
+	char buf[DLM_CONTROLD_MSGLEN], *argv[MAXARGS];
+	int argc = 0, rv;
+	struct lockspace *ls;
+
+	memset(buf, 0, sizeof(buf));
+	memset(argv, 0, sizeof(char *) * MAXARGS);
+
+	rv = do_read(client[ci].fd, buf, DLM_CONTROLD_MSGLEN);
+	if (rv < 0) {
+		log_error("client %d fd %d read error %d %d", ci,
+			  client[ci].fd, rv, errno);
+		client_dead(ci);
+		return;
+	}
+
+	log_debug("ci %d read %s", ci, buf);
+
+	get_args(buf, &argc, argv, ' ', 2);
+
+	if (!strncmp(argv[0], "deadlock_check", 14)) {
+		ls = find_ls(argv[1]);
+		if (ls)
+			send_cycle_start(ls);
+		else
+			log_debug("deadlock_check ls name not found");
+	}
+}
+
+static void process_listener(int ci)
+{
+	int fd, i;
+
+	fd = accept(client[ci].fd, NULL, NULL);
+	if (fd < 0) {
+		log_error("process_listener: accept error %d %d", fd, errno);
+		return;
+	}
+	
+	i = client_add(fd, process_connection, NULL);
+
+	log_debug("client connection %d fd %d", i, fd);
+}
+
+static int setup_listener(void)
+{
+	struct sockaddr_un addr;
+	socklen_t addrlen;
+	int rv, s;
+
+	/* we listen for new client connections on socket s */
+
+	s = socket(AF_LOCAL, SOCK_STREAM, 0);
+	if (s < 0) {
+		log_error("socket error %d %d", s, errno);
+		return s;
+	}
+
+	memset(&addr, 0, sizeof(addr));
+	addr.sun_family = AF_LOCAL;
+	strcpy(&addr.sun_path[1], DLM_CONTROLD_SOCK_PATH);
+	addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1;
+
+	rv = bind(s, (struct sockaddr *) &addr, addrlen);
+	if (rv < 0) {
+		log_error("bind error %d %d", rv, errno);
+		close(s);
+		return rv;
+	}
+
+	rv = listen(s, 5);
+	if (rv < 0) {
+		log_error("listen error %d %d", rv, errno);
+		close(s);
+		return rv;
+	}
+	return s;
+}
+
+void cluster_dead(int ci)
+{
+	log_error("cluster is down, exiting");
+	clear_configfs();
+	exit(1);
+}
+
+static int loop(void)
+{
+	int rv, i;
+	void (*workfn) (int ci);
+	void (*deadfn) (int ci);
+
+	rv = setup_listener();
+	if (rv < 0)
+		goto out;
+	client_add(rv, process_listener, NULL);
+
+	rv = setup_groupd();
 	if (rv < 0)
 		goto out;
-	pollfd[0].fd = groupd_fd;
-	pollfd[0].events = POLLIN;
+	client_add(rv, process_groupd, cluster_dead);
 
-	rv = uevent_fd = setup_uevent();
+	rv = setup_uevent();
 	if (rv < 0)
 		goto out;
-	pollfd[1].fd = uevent_fd;
-	pollfd[1].events = POLLIN;
+	client_add(rv, process_uevent, NULL);
 
-	rv = member_fd = setup_member();
+	rv = setup_member();
 	if (rv < 0)
 		goto out;
-	pollfd[2].fd = member_fd;
-	pollfd[2].events = POLLIN;
+	client_add(rv, process_member, cluster_dead);
 
-	maxi = 2;
+	rv = setup_netlink();
+	if (rv < 0)
+		goto for_loop;
+	client_add(rv, process_netlink, NULL);
+
+ for_loop:
 
 	for (;;) {
-		rv = poll(pollfd, maxi + 1, -1);
-		if (rv == -1 && errno == EINTR)
+		rv = poll(pollfd, client_maxi + 1, -1);
+		if (rv == -1 && errno == EINTR) {
+			if (daemon_quit && list_empty(&lockspaces)) {
+				clear_configfs();
+				exit(1);
+			}
+			daemon_quit = 0;
 			continue;
+		}
 		if (rv < 0) {
 			log_error("poll errno %d", errno);
 			goto out;
 		}
 
-		for (i = 0; i <= maxi; i++) {
+		for (i = 0; i <= client_maxi; i++) {
+			if (client[i].fd < 0)
+				continue;
 			if (pollfd[i].revents & POLLIN) {
-				if (pollfd[i].fd == groupd_fd)
-					process_groupd();
-				else if (pollfd[i].fd == uevent_fd)
-					process_uevent();
-				else if (pollfd[i].fd == member_fd)
-					process_member();
+				workfn = client[i].workfn;
+				workfn(i);
 			}
-
 			if (pollfd[i].revents & POLLHUP) {
-				if (pollfd[i].fd == member_fd) {
-					log_error("cluster is down, exiting");
-					clear_configfs();
-					exit(1);
-				}
-				if (pollfd[i].fd == groupd_fd) {
-					log_error("groupd is down, exiting");
-					clear_configfs();
-					exit(1);
-				}
-				log_debug("closing fd %d", pollfd[i].fd);
-				close(pollfd[i].fd);
+				deadfn = client[i].deadfn;
+				deadfn(i);
 			}
 		}
 	}
@@ -346,6 +693,7 @@
 	printf("\n");
 	printf("Options:\n");
 	printf("\n");
+	printf("  -d <num>     Enable (1) or disable (0, default) deadlock code\n");     
 	printf("  -D	       Enable debugging code and don't fork\n");
 	printf("  -K	       Enable kernel dlm debugging messages\n");
 	printf("  -h	       Print this help, then exit\n");
@@ -375,6 +723,10 @@
 			exit(EXIT_SUCCESS);
 			break;
 
+		case 'd':
+			deadlock_enabled = atoi(optarg);
+			break;
+
 		case 'V':
 			printf("dlm_controld (built %s %s)\n", __DATE__, __TIME__);
 			/* printf("%s\n", REDHAT_COPYRIGHT); */
@@ -440,6 +792,8 @@
 	if (!daemon_debug_opt)
 		daemonize();
 
+	setup_deadlock();
+
 	signal(SIGTERM, sigterm_handler);
 
 	set_scheduler();
--- cluster/group/dlm_controld/member_cman.c	2007/05/04 21:05:28	1.7
+++ cluster/group/dlm_controld/member_cman.c	2007/07/24 18:15:43	1.8
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -13,15 +13,14 @@
 #include <libcman.h>
 #include "dlm_daemon.h"
 
+int			our_nodeid;
 static cman_handle_t	ch;
 static cman_node_t      old_nodes[MAX_NODES];
 static int              old_node_count;
 static cman_node_t      cman_nodes[MAX_NODES];
 static int              cman_node_count;
-static int		local_nodeid;
 extern struct list_head lockspaces;
 
-
 static int is_member(cman_node_t *node_list, int count, int nodeid)
 {
 	int i;
@@ -104,7 +103,7 @@
 					  cman_nodes[i].cn_address.cna_address,
 					  cman_nodes[i].cn_address.cna_addrlen,
 					  (cman_nodes[i].cn_nodeid ==
-					   local_nodeid));
+					   our_nodeid));
 		}
 	}
 }
@@ -126,7 +125,7 @@
 	}
 }
 
-int process_member(void)
+void process_member(int ci)
 {
 	int rv;
 
@@ -138,7 +137,6 @@
 		clear_configfs();
 		exit(1);
 	}
-	return 0;
 }
 
 int setup_member(void)
@@ -148,7 +146,7 @@
 
 	ch = cman_init(NULL);
 	if (!ch) {
-		log_error("cman_init error %d %d", (int) ch, errno);
+		log_error("cman_init error %p %d", ch, errno);
 		return -ENOTCONN;
 	}
 
@@ -171,7 +169,7 @@
 		fd = rv;
 		goto out;
 	}
-	local_nodeid = node.cn_nodeid;
+	our_nodeid = node.cn_nodeid;
 
 	old_node_count = 0;
 	memset(&old_nodes, 0, sizeof(old_nodes));


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]