[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] cluster/group/dlm_controld deadlock.c dlm_daemon.h



CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	teigland sourceware org	2007-08-10 20:23:08

Modified files:
	group/dlm_controld: deadlock.c dlm_daemon.h 

Log message:
	Detection and resolution now works with my basic deadlock tests.
	
	Had to rework how lock state is assembled.  The previous method was
	simpler and just gathered the master lock state from all nodes, but
	I failed to realize that the xid (transaction id) isn't synced to
	remote master copy locks.  So, now all lock state is saved in the
	checkpoints, both master-copy and process-copy (containing the xid)
	which are then merged to give the full view of the lock.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/deadlock.c.diff?cvsroot=cluster&r1=1.3&r2=1.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/dlm_controld/dlm_daemon.h.diff?cvsroot=cluster&r1=1.11&r2=1.12

--- cluster/group/dlm_controld/deadlock.c	2007/08/07 19:20:54	1.3
+++ cluster/group/dlm_controld/deadlock.c	2007/08/10 20:23:07	1.4
@@ -31,6 +31,11 @@
 	int			checkpoint_ready;
 };
 
+enum {
+	LOCAL_COPY = 1,
+	MASTER_COPY = 2,
+};
+
 /* from linux/fs/dlm/dlm_internal.h */
 #define DLM_LKSTS_WAITING       1
 #define DLM_LKSTS_GRANTED       2
@@ -47,7 +52,7 @@
 	int8_t			status;
 	int8_t			grmode;
 	int8_t			rqmode;
-	int8_t			pad;
+	int8_t			copy;
 };
 
 struct dlm_rsb {
@@ -57,12 +62,14 @@
 	int			len;
 };
 
+/* information is saved in the lkb, and lkb->lock, from the perspective of the
+   local or master copy, not the process copy */
+
 struct dlm_lkb {
 	struct list_head        list;       /* r->locks */
 	struct pack_lock	lock;       /* data from debugfs/checkpoint */
-	unsigned int		time;       /* waiting time read from debugfs */
-	int			from;       /* node that checkpointed the lock*/
-	struct dlm_rsb		*rsb;       /* lock against this resource */
+	int			home;       /* node where the lock owner lives*/
+	struct dlm_rsb		*rsb;       /* lock is on resource */
 	struct trans		*trans;     /* lock owned by this transaction */
 	struct list_head	trans_list; /* tr->locks */
 };
@@ -120,6 +127,19 @@
 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 }
 
+static char *status_str(int lksts)
+{
+	switch (lksts) {
+	case DLM_LKSTS_WAITING:
+		return "W";
+	case DLM_LKSTS_GRANTED:
+		return "G";
+	case DLM_LKSTS_CONVERT:
+		return "C";
+	}
+	return "?";
+}
+
 static void free_resources(struct lockspace *ls)
 {
 	struct dlm_rsb *r, *r_safe;
@@ -128,6 +148,8 @@
 	list_for_each_entry_safe(r, r_safe, &ls->resources, list) {
 		list_for_each_entry_safe(lkb, lkb_safe, &r->locks, list) {
 			list_del(&lkb->list);
+			if (!list_empty(&lkb->trans_list))
+				list_del(&lkb->trans_list);
 			free(lkb);
 		}
 		list_del(&r->list);
@@ -135,6 +157,18 @@
 	}
 }
 
+static void free_transactions(struct lockspace *ls)
+{
+	struct trans *tr, *tr_safe;
+
+	list_for_each_entry_safe(tr, tr_safe, &ls->transactions, list) {
+		list_del(&tr->list);
+		if (tr->waitfor)
+			free(tr->waitfor);
+		free(tr);
+	}
+}
+
 static void disable_deadlock(void)
 {
 	log_error("FIXME: deadlock detection disabled");
@@ -198,6 +232,7 @@
 		disable_deadlock();
 	} else {
 		memset(lkb, 0, sizeof(struct dlm_lkb));
+		INIT_LIST_HEAD(&lkb->list);
 		INIT_LIST_HEAD(&lkb->trans_list);
 	}
 	return lkb;
@@ -209,7 +244,117 @@
 	lkb->rsb = r;
 }
 
-#define LOCK_LINE_MAX 1024
+/* from linux/fs/dlm/dlm_internal.h */
+#define IFL_MSTCPY 0x00010000
+
+/* called on a lock that's just been read from debugfs */
+
+static void set_copy(struct pack_lock *lock)
+{
+	uint32_t id, remid;
+
+	if (!lock->nodeid)
+		lock->copy = LOCAL_COPY;
+	else if (lock->flags & IFL_MSTCPY)
+		lock->copy = MASTER_COPY;
+	else {
+		/* process copy lock is converted to a partial master copy
+		   lock that will be combined with the real master copy */
+		lock->copy = MASTER_COPY;
+		id = lock->id;
+		remid = lock->remid;
+		lock->id = remid;
+		lock->remid = id;
+		lock->nodeid = our_nodeid;
+	}
+}
+
+/* xid is always zero in the real master copy, xid should always be non-zero
+   in the partial master copy (what was a process copy) */
+/* TODO: confirm or enforce that the partial will always have non-zero xid */
+
+static int partial_master_copy(struct pack_lock *lock)
+{
+	return (lock->xid != 0);
+}
+
+static struct dlm_lkb *get_lkb(struct dlm_rsb *r, struct pack_lock *lock)
+{
+	struct dlm_lkb *lkb;
+
+	if (lock->copy != MASTER_COPY)
+		goto out;
+
+	list_for_each_entry(lkb, &r->locks, list) {
+		if (lkb->lock.nodeid == lock->nodeid &&
+		    lkb->lock.id == lock->id)
+			return lkb;
+	}
+ out:
+	return create_lkb();
+}
+
+static void add_lock(struct lockspace *ls, struct dlm_rsb *r, int from_nodeid,
+		     struct pack_lock *lock)
+{
+	struct dlm_lkb *lkb;
+
+	lkb = get_lkb(r, lock);
+	if (!lkb)
+		return;
+
+	switch (lock->copy) {
+	case LOCAL_COPY:
+		lkb->lock.xid     = lock->xid;
+		lkb->lock.nodeid  = lock->nodeid;
+		lkb->lock.id      = lock->id;
+		lkb->lock.remid   = lock->remid;
+		lkb->lock.ownpid  = lock->ownpid;
+		lkb->lock.exflags = lock->exflags;
+		lkb->lock.flags   = lock->flags;
+		lkb->lock.status  = lock->status;
+		lkb->lock.grmode  = lock->grmode;
+		lkb->lock.rqmode  = lock->rqmode;
+		lkb->lock.copy    = LOCAL_COPY;
+		lkb->home = from_nodeid;
+
+		log_group(ls, "add %s local nodeid %d id %x remid %x xid %llx",
+			  r->name, lock->nodeid, lock->id, lock->remid,
+			  (unsigned long long)lock->xid);
+		break;
+
+	case MASTER_COPY:
+		if (partial_master_copy(lock)) {
+			lkb->lock.xid     = lock->xid;
+			lkb->lock.nodeid  = lock->nodeid;
+			lkb->lock.id      = lock->id;
+			lkb->lock.remid   = lock->remid;
+			lkb->lock.copy    = MASTER_COPY;
+		} else {
+			/* only set xid from partial master copy above */
+			lkb->lock.nodeid  = lock->nodeid;
+			lkb->lock.id      = lock->id;
+			lkb->lock.remid   = lock->remid;
+			lkb->lock.copy    = MASTER_COPY;
+			/* set other fields from real master copy */
+			lkb->lock.ownpid  = lock->ownpid;
+			lkb->lock.exflags = lock->exflags;
+			lkb->lock.flags   = lock->flags;
+			lkb->lock.status  = lock->status;
+			lkb->lock.grmode  = lock->grmode;
+			lkb->lock.rqmode  = lock->rqmode;
+		}
+		lkb->home = lock->nodeid;
+
+		log_group(ls, "add %s master nodeid %d id %x remid %x xid %llx",
+			  r->name, lock->nodeid, lock->id, lock->remid,
+			  (unsigned long long)lock->xid);
+		break;
+	}
+
+	if (list_empty(&lkb->list))
+		add_lkb(r, lkb);
+}
 
 static void parse_r_name(char *line, char *name)
 {
@@ -229,21 +374,23 @@
 	}
 }
 
-/* old/original way of dumping (only master state) in 5.1 kernel */
+#define LOCK_LINE_MAX 1024
 
-static int read_debugfs_master(struct lockspace *ls)
+static int read_debugfs_locks(struct lockspace *ls)
 {
 	FILE *file;
 	char path[PATH_MAX];
 	char line[LOCK_LINE_MAX];
 	struct dlm_rsb *r;
-	struct dlm_lkb *lkb;
+	struct pack_lock lock;
 	char r_name[65];
 	unsigned long long xid;
+	unsigned int waiting;
+	int r_nodeid;
 	int r_len;
 	int rv;
 
-	snprintf(path, PATH_MAX, "/sys/kernel/debug/dlm/%s_master", ls->name);
+	snprintf(path, PATH_MAX, "/sys/kernel/debug/dlm/%s_locks", ls->name);
 
 	file = fopen(path, "r");
 	if (!file)
@@ -253,30 +400,27 @@
 	fgets(line, LOCK_LINE_MAX, file);
 
 	while (fgets(line, LOCK_LINE_MAX, file)) {
-		lkb = create_lkb();
-		if (!lkb)
-			break;
+		memset(&lock, 0, sizeof(struct pack_lock));
 
-		rv = sscanf(line, "%x %d %x %u %llu %x %hhd %hhd %hhd %u %d",
-			    &lkb->lock.id,
-			    &lkb->lock.nodeid,
-			    &lkb->lock.remid,
-			    &lkb->lock.ownpid,
+		rv = sscanf(line, "%x %d %x %u %llu %x %x %hhd %hhd %hhd %u %d %d",
+			    &lock.id,
+			    &lock.nodeid,
+			    &lock.remid,
+			    &lock.ownpid,
 			    &xid,
-			    &lkb->lock.exflags,
-			    &lkb->lock.status,
-			    &lkb->lock.grmode,
-			    &lkb->lock.rqmode,
-			    &lkb->time,
+			    &lock.exflags,
+			    &lock.flags,
+			    &lock.status,
+			    &lock.grmode,
+			    &lock.rqmode,
+			    &waiting,
+			    &r_nodeid,
 			    &r_len);
 
-		lkb->lock.xid = xid; /* hack to avoid warning */
-
-		log_debug("%s", line);
+		lock.xid = xid; /* hack to avoid warning */
 
-		if (rv != 11) {
+		if (rv != 13) {
 			log_error("invalid debugfs line %d: %s", rv, line);
-			free(lkb);
 			goto out;
 		}
 
@@ -286,77 +430,41 @@
 		r = get_resource(ls, r_name, r_len);
 		if (!r)
 			break;
-		add_lkb(r, lkb);
+
+		set_copy(&lock);
+		add_lock(ls, r, our_nodeid, &lock);
 	}
  out:
 	fclose(file);
 	return 0;
 }
 
-static int read_debugfs_locks(struct lockspace *ls)
+static int read_checkpoint_locks(struct lockspace *ls, int from_nodeid,
+			         char *numbuf, int buflen)
 {
-	FILE *file;
-	char path[PATH_MAX];
-	char line[LOCK_LINE_MAX];
 	struct dlm_rsb *r;
-	struct dlm_lkb *lkb;
-	char r_name[65];
-	unsigned long long xid;
-	int r_nodeid;
-	int r_len;
-	int rv;
-
-	snprintf(path, PATH_MAX, "/sys/kernel/debug/dlm/%s_locks", ls->name);
+	struct pack_lock *lock;
+	int count = section_len / sizeof(struct pack_lock);
+	int i;
 
-	file = fopen(path, "r");
-	if (!file)
+	r = get_resource(ls, numbuf, buflen - 1);
+	if (!r)
 		return -1;
 
-	/* skip the header on the first line */
-	fgets(line, LOCK_LINE_MAX, file);
-
-	while (fgets(line, LOCK_LINE_MAX, file)) {
-		lkb = create_lkb();
-		if (!lkb)
-			break;
-
-		rv = sscanf(line, "%x %d %x %u %llu %x %x %hhd %hhd %hhd %u %d %d",
-			    &lkb->lock.id,
-			    &lkb->lock.nodeid,
-			    &lkb->lock.remid,
-			    &lkb->lock.ownpid,
-			    &xid,
-			    &lkb->lock.exflags,
-			    &lkb->lock.flags,
-			    &lkb->lock.status,
-			    &lkb->lock.grmode,
-			    &lkb->lock.rqmode,
-			    &lkb->time,
-			    &r_nodeid,
-			    &r_len);
-
-		lkb->lock.xid = xid; /* hack to avoid warning */
-
-		if (rv != 13) {
-			log_error("invalid debugfs line %d: %s", rv, line);
-			free(lkb);
-			goto out;
-		}
-
-		memset(r_name, 0, sizeof(r_name));
-		parse_r_name(line, r_name);
+	lock = (struct pack_lock *) &section_buf;
 
-		/* only collecting master lock state */
-		if (r_nodeid)
-			continue;
+	for (i = 0; i < count; i++) {
+		lock->xid     = le64_to_cpu(lock->xid);
+		lock->id      = le32_to_cpu(lock->id);
+		lock->nodeid  = le32_to_cpu(lock->nodeid);
+		lock->remid   = le32_to_cpu(lock->remid);
+		lock->ownpid  = le32_to_cpu(lock->ownpid);
+		lock->exflags = le32_to_cpu(lock->exflags);
+		lock->flags   = le32_to_cpu(lock->flags);
 
-		r = get_resource(ls, r_name, r_len);
-		if (!r)
-			break;
-		add_lkb(r, lkb);
+		add_lock(ls, r, from_nodeid, lock);
+		lock++;
 	}
- out:
-	fclose(file);
 	return 0;
 }
 
@@ -382,7 +490,7 @@
 		lock->status  = lkb->lock.status;
 		lock->grmode  = lkb->lock.grmode;
 		lock->rqmode  = lkb->lock.rqmode;
-		lock->pad     = lkb->lock.pad;
+		lock->copy    = lkb->lock.copy;
 
 		lock++;
 		count++;
@@ -405,45 +513,6 @@
 	section_len = count * sizeof(struct pack_lock);
 }
 
-static int unpack_section_buf(struct lockspace *ls, int nodeid,
-			      char *numbuf, int buflen)
-{
-	struct dlm_rsb *r;
-	struct dlm_lkb *lkb;
-	struct pack_lock *lock;
-	int count = section_len / sizeof(struct pack_lock);
-	int i;
-
-	r = get_resource(ls, numbuf, buflen);
-	if (!r)
-		return -1;
-
-	lock = (struct pack_lock *) &section_buf;
-
-	for (i = 0; i < count; i++) {
-		lkb = create_lkb();
-		if (!lkb)
-			break;
-
-		lkb->lock.xid     = le64_to_cpu(lock->xid);
-		lkb->lock.id      = le32_to_cpu(lock->id);
-		lkb->lock.nodeid  = le32_to_cpu(lock->nodeid);
-		lkb->lock.remid   = le32_to_cpu(lock->remid);
-		lkb->lock.ownpid  = le32_to_cpu(lock->ownpid);
-		lkb->lock.exflags = le32_to_cpu(lock->exflags);
-		lkb->lock.flags   = le32_to_cpu(lock->flags);
-		lkb->lock.status  = lock->status;
-		lkb->lock.grmode  = lock->grmode;
-		lkb->lock.rqmode  = lock->rqmode;
-		lkb->lock.pad     = lock->pad;
-
-		lkb->from = nodeid;
-		add_lkb(r, lkb);
-		lock++;
-	}
-	return 0;
-}
-
 static int _unlink_checkpoint(struct lockspace *ls, SaNameT *name)
 {
 	SaCkptCheckpointHandleT h;
@@ -536,11 +605,11 @@
 	int len;
 	int retries;
 
-	log_group(ls, "read_checkpoint %d", nodeid);
-
 	if (nodeid == our_nodeid)
 		return;
 
+	log_group(ls, "read_checkpoint %d", nodeid);
+
 	len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
 		       ls->name, nodeid);
 	name.length = len;
@@ -635,8 +704,8 @@
 			continue;
 		}
 
-		unpack_section_buf(ls, nodeid, (char *)desc.sectionId.id,
-				   desc.sectionId.idLen);
+		read_checkpoint_locks(ls, nodeid, (char *)desc.sectionId.id,
+				      desc.sectionId.idLen);
 	}
 
  out_it:
@@ -861,19 +930,15 @@
 	}
 	memset(buf, 0, len);
 
-	if (lkb->lock.nodeid) {
-		/* this was MSTCPY lkb from master node */
-		to_nodeid = lkb->lock.nodeid;
-		lkid = lkb->lock.remid;
-	} else {
-		/* process-copy lkb from node where lock is held */
-		to_nodeid = lkb->from;
+	if (!lkb->lock.nodeid)
 		lkid = lkb->lock.id;
-	}
+	else
+		lkid = lkb->lock.remid;
+	to_nodeid = lkb->home;
 
-	log_group(ls, "send_cancel_lock to %x lkid %d, id %x remid %x "
-		  "nodeid %d from %d", to_nodeid, lkid,
-		  lkb->lock.id, lkb->lock.remid, lkb->lock.nodeid, lkb->from);
+	log_group(ls, "send_cancel_lock to nodeid %d rsb %s id %x xid %llx",
+		  to_nodeid, lkb->rsb->name, lkid,
+		  (unsigned long long)lkb->lock.xid);
 
 	hd = (struct dlm_header *)buf;
 	hd->version[0]  = cpu_to_le16(DLM_HEADER_MAJOR);
@@ -891,14 +956,38 @@
 	free(buf);
 }
 
+static void dump_resources(struct lockspace *ls)
+{
+	struct dlm_rsb *r;
+	struct dlm_lkb *lkb;
+
+	log_group(ls, "Resource dump:");
+
+	list_for_each_entry(r, &ls->resources, list) {
+		log_group(ls, "\"%s\" len %d", r->name, r->len);
+		list_for_each_entry(lkb, &r->locks, list) {
+			log_group(ls, "  %s: nodeid %d id %08x remid %08x gr %s rq %s pid %u xid %llx",
+			  	  status_str(lkb->lock.status),
+				  lkb->lock.nodeid,
+			  	  lkb->lock.id,
+			  	  lkb->lock.remid,
+			  	  dlm_mode_str(lkb->lock.grmode),
+			          dlm_mode_str(lkb->lock.rqmode),
+			          lkb->lock.ownpid,
+				  (unsigned long long)lkb->lock.xid);
+		}
+	}
+}
+
 static void find_deadlock(struct lockspace *ls);
 
 static void receive_checkpoint_ready(struct lockspace *ls, int nodeid)
 {
 	struct node *node;
 	int not_ready = 0;
+	int low = -1;
 
-	log_group(ls, "receive_checkpoint_ready %d", nodeid);
+	log_group(ls, "receive_checkpoint_ready from %d", nodeid);
 
 	read_checkpoint(ls, nodeid);
 
@@ -910,12 +999,19 @@
 			node->checkpoint_ready = 1;
 		if (!node->checkpoint_ready)
 			not_ready++;
-	}
 
-	if (not_ready) {
-		log_group(ls, "not_ready %d", not_ready);
+		log_group(ls, "nodeid %d checkpoint_ready = %d",
+			  node->nodeid, node->checkpoint_ready);
+	}
+	if (not_ready)
 		return;
+
+	list_for_each_entry(node, &ls->nodes, list) {
+		if (node->nodeid < low || low == -1)
+			low = node->nodeid;
 	}
+	ls->low_nodeid = low;
+	log_group(ls, "low nodeid in charge of resolution is %d", low);
 
 	find_deadlock(ls);
 }
@@ -930,8 +1026,10 @@
 
 	rv = read_debugfs_locks(ls);
 	if (rv < 0) {
+#if 0
 		/* compat for RHEL5.1 kernels */
 		rv = read_debugfs_master(ls);
+#endif
 		if (rv < 0) {
 			log_error("can't read dlm debugfs file: %s",
 				  strerror(errno));
@@ -962,8 +1060,8 @@
 
 	rv = dlm_ls_deadlock_cancel(h, lkid, 0);
 	if (rv < 0) {
-		log_error("deadlock cancel %x from %x lib cancel error %d",
-			  lkid, nodeid, rv);
+		log_error("deadlock cancel %x from %x lib cancel errno %d",
+			  lkid, nodeid, errno);
 	}
 
 	dlm_close_lockspace(h);
@@ -1055,6 +1153,13 @@
 	if (!ls)
 		return;
 
+	if (!ls->got_first_confchg) {
+		ls->got_first_confchg = 1;
+		for (i = 0; i < member_list_entries; i++)
+			node_joined(ls, member_list[i].nodeid);
+		return;
+	}
+
 	for (i = 0; i < joined_list_entries; i++)
 		node_joined(ls, joined_list[i].nodeid);
 
@@ -1263,11 +1368,21 @@
 	if (locks_compat(waiting_lkb, granted_lkb))
 		return;
 
+	/* this shouldn't happen AFAIK */
+	if (tr == granted_lkb->trans) {
+		log_group(ls, "trans %llx waiting on self",
+			  (unsigned long long)tr->xid);
+		return;
+	}
+
 	/* don't add the same trans to the waitfor list multiple times */
 	if (tr->waitfor_count && in_waitfor(tr, granted_lkb->trans)) {
-		log_group(ls, "trans %llx already waiting for trans %llx",
-			  (unsigned long long)tr->xid,
-			  (unsigned long long)granted_lkb->trans->xid);
+		log_group(ls, "trans %llx already waiting for trans %llx, "
+			  "waiting %x %s, granted %x %s",
+			  (unsigned long long)waiting_lkb->trans->xid,
+			  (unsigned long long)granted_lkb->trans->xid,
+			  waiting_lkb->lock.id, waiting_lkb->rsb->name,
+			  granted_lkb->lock.id, granted_lkb->rsb->name);
 		return;
 	}
 
@@ -1293,6 +1408,7 @@
 	struct dlm_lkb *waiting_lkb, *granted_lkb;
 	struct dlm_rsb *r;
 	struct trans *tr;
+	int depend_count = 0;
 
 	list_for_each_entry(tr, &ls->transactions, list) {
 		list_for_each_entry(waiting_lkb, &tr->locks, trans_list) {
@@ -1307,9 +1423,12 @@
 					continue;
 				/* granted_lkb status is GRANTED or CONVERT */
 				add_waitfor(ls, waiting_lkb, granted_lkb);
+				depend_count++;
 			}
 		}
 	}
+
+	log_group(ls, "create_waitfor_graph: depend_count %d", depend_count);
 }
 
 /* Assume a transaction that's not waiting on any locks will complete, release
@@ -1378,9 +1497,8 @@
 		removed++;
 	}
 
-	if (removed)
-		log_group(ls, "reduce_waitfor_graph: %d blocked, %d removed",
-			  blocked, removed);
+	log_group(ls, "reduce_waitfor_graph: %d blocked, %d removed",
+		  blocked, removed);
 	return removed;
 }
 
@@ -1440,26 +1558,13 @@
 	   should completely reduce */
 }
 
-static char *status_str(int lksts)
-{
-	switch (lksts) {
-	case DLM_LKSTS_WAITING:
-		return "W";
-	case DLM_LKSTS_GRANTED:
-		return "G";
-	case DLM_LKSTS_CONVERT:
-		return "C";
-	}
-	return "?";
-}
-
 static void dump_trans(struct lockspace *ls, struct trans *tr)
 {
 	struct dlm_lkb *lkb;
 	struct trans *wf;
 	int i;
 
-	log_group(ls, "trans %llx waitfor_count %d others_waiting_on_us %d",
+	log_group(ls, "trans xid %llx waitfor_count %d others_waiting_on_us %d",
 		  (unsigned long long)tr->xid, tr->waitfor_count,
 		  tr->others_waiting_on_us);
 
@@ -1492,39 +1597,52 @@
 {
 	struct trans *tr;
 
+	log_group(ls, "Transaction dump:");
+
 	list_for_each_entry(tr, &ls->transactions, list)
 		dump_trans(ls, tr);
 }
 
 static void find_deadlock(struct lockspace *ls)
 {
+	struct node *node;
+
 	if (list_empty(&ls->resources)) {
-		log_group(ls, "no resources no deadlock");
+		log_group(ls, "no deadlock: no resources");
+		return;
+	}
+
+	if (!list_empty(&ls->transactions)) {
+		log_group(ls, "transactions list should be empty");
 		return;
 	}
 
+	dump_resources(ls);
 	create_trans_list(ls);
 	create_waitfor_graph(ls);
-
-	log_group(ls, "created waitfor graph:");
 	dump_all_trans(ls);
-
 	reduce_waitfor_graph_loop(ls);
 
 	if (list_empty(&ls->transactions)) {
-		log_group(ls, "no deadlock");
+		log_group(ls, "no deadlock: all transactions reduced");
 		goto out;
 	}
 
 	log_group(ls, "found deadlock");
 	dump_all_trans(ls);
 
-	cancel_trans(ls);
+	/* TODO: should probably do this above instead */
+	if (ls->low_nodeid != our_nodeid) {
+		log_group(ls, "defer resolution to low nodeid %d",
+			  ls->low_nodeid);
+		goto out;
+	}
 
+	cancel_trans(ls);
 	reduce_waitfor_graph_loop(ls);
 
 	if (list_empty(&ls->transactions)) {
-		log_group(ls, "deadlock resolved with cancel");
+		log_group(ls, "resolved deadlock with cancel");
 		goto out;
 	}
 
@@ -1533,5 +1651,9 @@
 
  out:
 	free_resources(ls);
+	free_transactions(ls);
+
+	list_for_each_entry(node, &ls->nodes, list)
+		node->checkpoint_ready = 0;
 }
 
--- cluster/group/dlm_controld/dlm_daemon.h	2007/08/06 21:50:26	1.11
+++ cluster/group/dlm_controld/dlm_daemon.h	2007/08/10 20:23:07	1.12
@@ -86,6 +86,7 @@
 	struct list_head	list;
 	char			name[MAXNAME+1];
 	uint32_t		global_id;
+	int			low_nodeid;
 	int			joining;
 	int			cpg_ci;
 	cpg_handle_t		cpg_h;
@@ -95,6 +96,7 @@
 	struct list_head	nodes;
 	struct timeval		last_deadlock_check;
 	unsigned int		timewarn_count;
+	int			got_first_confchg;
 };
 
 /* action.c */


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]