[Cluster-devel] cluster cmirror/src/Makefile cmirror/src/clust ...

jbrassow at sourceware.org jbrassow at sourceware.org
Thu Nov 8 22:16:55 UTC 2007


CVSROOT:	/cvs/cluster
Module name:	cluster
Branch: 	RHEL5
Changes by:	jbrassow at sourceware.org	2007-11-08 22:16:53

Modified files:
	cmirror/src    : Makefile cluster.c functions.c local.c 
	cmirror-kernel/src: dm-clog-tfr.c dm-clog-tfr.h 
Added files:
	cmirror/src    : rbtree.c rbtree.h 

Log message:
	- only write the disk log on a 'flush' not every time a mark/clear
	happens
	
	- Add mark request tracking so we don't clear log bits prematurely
	(and to reduce number of disk writes).
	
	- Add priority recovery - regions which are being written to take
	first priority during recovery
	
	- introduction of CPG flow control

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/rbtree.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=NONE&r2=1.1.2.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/rbtree.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=NONE&r2=1.1.2.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/Makefile.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.3.2.3&r2=1.3.2.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.4&r2=1.1.2.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog-tfr.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog-tfr.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3

--- cluster/cmirror/src/Attic/Makefile	2007/11/03 18:37:48	1.3.2.3
+++ cluster/cmirror/src/Attic/Makefile	2007/11/08 22:16:53	1.3.2.4
@@ -23,6 +23,8 @@
 	echo '-I${incdir}'; else \
 	echo ''; fi)
 
+SOURCES = $(shell ls *.c)
+
 ifneq (${TMP_INCLUDE}, )
 INCLUDE += ${TMP_INCLUDE} -I.
 else
@@ -42,7 +44,8 @@
 
 all: ${TARGET}
 
-clogd: link_mon.c logging.c queues.c local.c cluster.c functions.c clogd.c
+#clogd: rbtree.c link_mon.c logging.c queues.c local.c cluster.c functions.c clogd.c
+clogd: ${SOURCES}
 	${CC} ${CFLAGS} -o $@ $^ ${LDFLAGS}
 
 no_files:
--- cluster/cmirror/src/Attic/cluster.c	2007/11/05 22:44:03	1.1.2.4
+++ cluster/cmirror/src/Attic/cluster.c	2007/11/08 22:16:53	1.1.2.5
@@ -58,6 +58,27 @@
 static struct list_head clog_cpg_list;
 
 /*
+ * flow_control
+ * @handle
+ *
+ * Returns: 1 if flow control needed, 0 otherwise
+ */
+static int flow_control(cpg_handle_t handle)
+{
+	cpg_flow_control_state_t flow_control_state;
+	cpg_error_t error;
+	
+	error = cpg_flow_control_state_get(handle, &flow_control_state);
+	if (error != CPG_OK) {
+		LOG_ERROR("Failed to get flow control state.  Reason: %d", error);
+		/* FIXME: Better error handling */
+		return 0;
+	}
+
+	return (flow_control_state == CPG_FLOW_CONTROL_ENABLED) ? 1 : 0;
+}
+
+/*
  * cluster_send
  * @tfr
  *
@@ -86,6 +107,16 @@
 
 	iov.iov_base = tfr;
 	iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
+	while (flow_control(entry->handle)) {
+		/*
+		 * FIXME: Don't need to sleep this long
+		 *
+		 * ... or, we could dispatch the queued messages here.
+		 */
+		LOG_PRINT("Flow control enabled.  Delaying msg [%s]",
+			  RQ_TYPE(tfr->request_type));
+		sleep(1);
+	}
 	r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
 
 	EXIT();
@@ -271,7 +302,7 @@
 	char buf[32];
 
 	ENTER();
-	LOG_PRINT("Sending checkpointed data to %u", cp->requester);
+	LOG_DBG("Sending checkpointed data to %u", cp->requester);
 
 	len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%u", cp->requester);
 	name.length = len;
@@ -540,6 +571,9 @@
 	ENTER();
 	list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list) {
 		r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
+		if (r != SA_AIS_OK)
+			LOG_ERROR("cpg_dispatch failed: %d", r);
+
 		for (cp = entry->checkpoint_list; cp;) {
 			LOG_ERROR("Checkpoint data available for node %u",
 				  cp->requester);
@@ -575,7 +609,6 @@
 	int i_am_server;
 	struct clog_tfr *tfr = msg;
 	struct clog_tfr *startup_tfr = NULL;
-	struct clog_tfr *cp_tfr = NULL;
 	struct clog_cpg *match;
 
 	ENTER();
@@ -603,57 +636,31 @@
 	i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
 
 	if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
-		/* Redundant checkpoints ignored due to match->valid */
 		if (my_cluster_id == tfr->originator) {
-			switch (import_checkpoint(match, match->valid)) {
-			case 0:
-				if (!match->valid) {
-					LOG_DBG("Checkpoint data recieved.  Log is now valid");
-					match->valid = 1;
-					while ((startup_tfr = queue_remove(match->startup_queue))) {
-						LOG_DBG("Processing delayed request %d: %s",
-							match->startup_queue->count,
-							RQ_TYPE(startup_tfr->request_type));
-						r = handle_cluster_request(startup_tfr, i_am_server);
-
-						if (r) {
-							LOG_ERROR("Error while processing delayed CPG message");
-							goto out;
-						} else {
-							queue_add(startup_tfr, free_queue);
-						}
-					}
-				}
-
-				break;
-			case -EAGAIN:
-				LOG_PRINT("Checkpoint data empty.  Requesting new checkpoint.");
-				
-				cp_tfr = queue_remove(free_queue);
-				if (!cp_tfr) {
-					/* FIXME: better error handling */
-					LOG_ERROR("No clog_tfr struct available");
-					goto out;
-				}
-				memset(cp_tfr, 0, sizeof(*cp_tfr));
-				cp_tfr->request_type = DM_CLOG_CHECKPOINT_REQUEST;
-
-				cp_tfr->originator = my_cluster_id;
-
-				strncpy(cp_tfr->uuid, tfr->uuid, CPG_MAX_NAME_LENGTH);
-
-				if ((r = cluster_send(cp_tfr))) {
-					/* FIXME: better error handling */
-					LOG_ERROR("Failed to send checkpoint ready notice");
-					queue_add(cp_tfr, free_queue);
-					goto out;
-				}
-				queue_add(cp_tfr, free_queue);
-
-				break;
-			default:
+			/* Redundant checkpoints ignored if match->valid */
+			if (import_checkpoint(match, match->valid)) {
 				LOG_ERROR("Failed to import checkpoint");
 				/* Could we retry? */
+				goto out;
+			} else if (!match->valid) {
+				LOG_DBG("Checkpoint data recieved.  Log is now valid");
+				match->valid = 1;
+				while ((startup_tfr = queue_remove(match->startup_queue))) {
+					LOG_DBG("Processing delayed request %d: %s",
+						match->startup_queue->count,
+						RQ_TYPE(startup_tfr->request_type));
+					r = handle_cluster_request(startup_tfr, i_am_server);
+
+					if (r) {
+						LOG_ERROR("Error while processing delayed CPG message");
+						/*
+						 * FIXME: If we error out here, we will never get
+						 * another opportunity to retry these requests
+						 */
+						goto out;
+					}
+					queue_add(startup_tfr, free_queue);
+				}
 			}
 		}
 		goto out;
@@ -760,8 +767,8 @@
 	    (member_list_entries == 1) && (joined_list_entries == 1) &&
 	    (member_list[0].nodeid == joined_list[0].nodeid)) {
 		match->lowest_id = my_cluster_id = joined_list[0].nodeid;
-		LOG_PRINT("I am the log server (and first to join) for %s",
-			  match->name.value);
+		LOG_DBG("I am the log server (and first to join) for %s",
+			match->name.value);
 		match->valid = 1;
 		goto out;
 	}
@@ -789,7 +796,7 @@
 		if (match->lowest_id > member_list[i].nodeid)
 			match->lowest_id = member_list[i].nodeid;
 
-	LOG_PRINT("Server is now %u", match->lowest_id);
+	LOG_DBG("Server is now %u", match->lowest_id);
 
 	/*
 	 * If I am part of the joining list, I do not send checkpoints
--- cluster/cmirror/src/Attic/functions.c	2007/11/05 22:44:03	1.1.2.3
+++ cluster/cmirror/src/Attic/functions.c	2007/11/08 22:16:53	1.1.2.4
@@ -15,6 +15,7 @@
 #include "common.h"
 #include "cluster.h"
 #include "logging.h"
+#include "rbtree.h"
 
 #define BYTE_SHIFT 3
 
@@ -27,6 +28,14 @@
 #define MIRROR_DISK_VERSION 2
 #define LOG_OFFSET 2
 
+/*
+ * Used by the 'touched' variable, these macros mean:
+ *   LOG_CHANGED - bits in the in-memory log have changed
+ *   LOG_FLUSH   - log must be committed to disk
+ */
+#define LOG_CHANGED 1
+#define LOG_FLUSH   2
+
 struct log_header {
         uint32_t magic;
         uint32_t version;
@@ -58,6 +67,9 @@
 
 	uint32_t state;         /* current operational state of the log */
 
+	struct rb_tree mark_tree; /* Tree that tracks all mark requests */
+	struct recovery_request *recovery_request_list;
+
 	int disk_fd;            /* -1 means no disk log */
 	int log_dev_failed;
 	uint64_t disk_nr_regions;
@@ -65,9 +77,20 @@
 	void *disk_buffer;      /* aligned memory for O_DIRECT */
 };
 
+struct mark_entry {
+	uint32_t nodeid;
+	uint64_t region;
+};
+
+struct recovery_request {
+	uint64_t region;
+	struct recovery_request *next;
+};
+
 static struct list_head log_list = LIST_HEAD_INIT(log_list);
 static struct list_head log_pending_list = LIST_HEAD_INIT(log_pending_list);
 
+
 static int log_test_bit(uint32_t *bs, unsigned bit)
 {
 	return ext2fs_test_bit(bit, (unsigned int *) bs) ? 1 : 0;
@@ -76,13 +99,13 @@
 static void log_set_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
 {
 	ext2fs_set_bit(bit, (unsigned int *) bs);
-	lc->touched = 1;
+	lc->touched |= LOG_CHANGED;
 }
 
 static void log_clear_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
 {
 	ext2fs_clear_bit(bit, (unsigned int *) bs);
-	lc->touched = 1;
+	lc->touched |= LOG_CHANGED;
 }
 
 /* FIXME: Why aren't count and start the same type? */
@@ -262,6 +285,47 @@
 	return 0;
 }
 
+static void *get_mark_entry_region(void *data)
+{
+	struct mark_entry *m = data;
+
+	return (void *)&m->region;
+}
+
+static int cmp_mark_entry_regions(void *a, void *b)
+{
+	uint64_t _a = *((uint64_t *)a);
+	uint64_t _b = *((uint64_t *)b);
+
+	return (_a == _b) ? 0 : (_a < _b) ? -1 : 1;
+}
+
+/*
+ * srsm_count - Same Region, Same Machine count
+ * @data - data held in the tree node (a mark_entry ptr)
+ * @adata - additional data passed in (nodeid)
+ *
+ * This function always returns 1 - allowing the RBT search
+ * to continuing finding additional matches.  It's useful
+ * feature is that it counts all the tree nodes that match
+ * the given machine (not just the region).  Results of the
+ * count is placed in 'srsm_count_var'.
+ *
+ * Returns: 1
+ */
+static int srsm_count_var = 0;
+static int srsm_count(void *data, void *adata)
+{
+	uint32_t nodeid = *((uint32_t *)adata);
+	struct mark_entry *m = data;
+
+	if (nodeid == m->nodeid)
+		srsm_count_var++;
+
+	return 1;
+}
+
+
 static int find_disk_path(char *major_minor_str, char *path_rtn)
 {
 	int r;
@@ -394,7 +458,11 @@
 	lc->disk_fd = -1;
 	lc->log_dev_failed = 0;
 
-	lc->bitset_uint32_count = region_count / (sizeof(*lc->clean_bits) << BYTE_SHIFT);
+	rbt_init(&lc->mark_tree, sizeof(struct mark_entry),
+		 get_mark_entry_region, cmp_mark_entry_regions);
+
+	lc->bitset_uint32_count = region_count / 
+		(sizeof(*lc->clean_bits) << BYTE_SHIFT);
 	if (region_count % (sizeof(*lc->clean_bits) << BYTE_SHIFT))
 		lc->bitset_uint32_count++;
 
@@ -622,7 +690,7 @@
 		LOG_DBG("Non-master resume: bits pre-loaded");
 		lc->resume_override = 1000;
 		lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
-		LOG_DBG("sync_count = %llu", lc->sync_count);
+		LOG_DBG("Initial sync_count = %llu", lc->sync_count);
 		goto out;
 		return 0;
 	default:
@@ -640,9 +708,9 @@
 	switch (tfr->error) {
 	case 0:
 		if (lc->disk_nr_regions < lc->region_count)
-			LOG_PRINT("Mirror has grown, updating log bits");
+			LOG_DBG("Mirror has grown, updating log bits");
 		else if (lc->disk_nr_regions > lc->region_count)
-			LOG_PRINT("Mirror has shrunk, updating log bits");
+			LOG_DBG("Mirror has shrunk, updating log bits");
 		break;		
 	case -EINVAL:
 		LOG_DBG("Read log failed: not yet initialized");
@@ -669,14 +737,14 @@
 	/* copy clean across to sync */
 	memcpy(lc->sync_bits, lc->clean_bits, size);
 	lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
-	LOG_DBG("sync_count = %llu", lc->sync_count);
+	LOG_DBG("Initial sync_count = %llu", lc->sync_count);
 	lc->sync_search = 0;
 
 	/*
-	 * We mark 'touched' so that only the master commits
+	 * We mark 'touched' as LOG_FLUSH so that only the master commits
 	 * the log via 'commit_log'
 	 */
-	lc->touched = 1;
+	lc->touched = LOG_FLUSH;
 out:
 	lc->state = LOG_RESUMED;
 	
@@ -789,6 +857,11 @@
 		return -EINVAL;
 
 	*rtn = log_test_bit(lc->sync_bits, region);
+	if (*rtn)
+		LOG_DBG("  Region is in-sync: %llu", region);
+	else
+		LOG_DBG("  Region is not in-sync: %llu", region);
+
 	tfr->data_size = sizeof(*rtn);
 
 	return 0;
@@ -814,7 +887,61 @@
 	    !log_test_bit(lc->clean_bits, lc->recovering_region))
 		return -EAGAIN;
 
-	/* Actual disk flush happens in 'commit_log()' */
+	/* 
+	 * Actual disk flush happens in 'commit_log()'
+	 * Clear LOG_CHANGED and set LOG_FLUSH
+	 */
+	lc->touched = LOG_FLUSH;
+
+	return 0;
+}
+
+/*
+ * mark_region
+ * @lc
+ * @region
+ * @who
+ *
+ * Put a mark region request in the tree for tracking.
+ *
+ * Returns: 0 on success, -EXXX on error
+ */
+static int mark_region(struct log_c *lc, uint64_t region, uint32_t who)
+{
+	struct rb_node *new;
+	struct mark_entry *m;
+
+	/*
+	 * The search will find every node in the tree that has
+	 * the same region marked.  The additional function
+	 * passed to 'rbt_search_plus' detects if the matching
+	 * nodes also are from the machine who is performing this
+	 * request.
+	 */
+	srsm_count_var = 0;
+	if (!rbt_search_plus(&lc->mark_tree, &region, srsm_count, &who))
+		log_clear_bit(lc, lc->clean_bits, region);
+
+	/* Requesting region/nodeid is already in the tree */
+	if (srsm_count_var)
+		return 0;
+
+	/*
+	 * Save allocation until here - if there is a failure,
+	 * at least we have cleared the bit.
+	 */
+	new = rbt_alloc_node(&lc->mark_tree);
+	if (!new) {
+		LOG_ERROR("Unable to allocate space for mark_entry: %llu/%u",
+			  region, who);
+		return -ENOMEM;
+	}
+
+	m = new->rb_data;
+	m->nodeid = who;
+	m->region = region;
+	rbt_insert(&lc->mark_tree, new);
+
 	return 0;
 }
 
@@ -827,9 +954,9 @@
  *
  * Returns: 0 on success, -EXXX on failure
  */
-static void print_bits(char *buf, int size);
 static int clog_mark_region(struct clog_tfr *tfr)
 {
+	int r;
 	int count;
 	uint64_t *region;
 	struct log_c *lc = get_log(tfr->uuid);
@@ -845,14 +972,58 @@
 	count = tfr->data_size / sizeof(uint64_t);
 	region = (uint64_t *)&tfr->data;
 
-	for (; count > 0; count--, region++)
-		log_clear_bit(lc, lc->clean_bits, *region);
+	for (; count > 0; count--, region++) {
+		r = mark_region(lc, *region, tfr->originator);
+		if (r)
+			return r;
+	}
 
 	tfr->data_size = 0;
 
 	return 0;
 }
 
+static int clear_region(struct log_c *lc, uint64_t region, uint32_t who)
+{
+	int set_bit = 1;
+	struct rb_node *mark_list;
+	struct mark_entry *m;
+
+	srsm_count_var = 0;
+	mark_list = rbt_search_plus(&lc->mark_tree, &region, srsm_count, &who);
+	if (!mark_list || !srsm_count_var) {
+		LOG_DBG("Clear issued on region that is not marked: %llu/%u",
+			region, who);
+		goto set_bit;
+	}
+
+	/* If rb_next is set, it means more than one node has this marked */
+	if (mark_list->rb_next)
+		set_bit = 0;
+
+	/* Must find this machine's entry to remove it */
+	for (; mark_list; mark_list = mark_list->rb_next) {
+		m = mark_list->rb_data;
+		if (m->nodeid == who)
+			break;
+	}
+
+	if (!mark_list) {
+		LOG_ERROR("Bad programming: searches disagree on results");
+		goto set_bit;
+	}
+
+	rbt_remove(&lc->mark_tree, mark_list);
+	rbt_free_node(&lc->mark_tree, mark_list);
+
+set_bit:
+	/* Only clear the region if it is also in-sync */
+	if (set_bit && log_test_bit(lc->sync_bits, region))
+		log_set_bit(lc, lc->clean_bits, region);
+
+	return 0;
+}
+
 /*
  * clog_clear_region
  * @tfr
@@ -864,6 +1035,7 @@
  */
 static int clog_clear_region(struct clog_tfr *tfr)
 {
+	int r;
 	int count;
 	uint64_t *region;
 	struct log_c *lc = get_log(tfr->uuid);
@@ -879,8 +1051,12 @@
 	count = tfr->data_size / sizeof(uint64_t);
 	region = (uint64_t *)&tfr->data;
 
-	for (; count > 0; count--, region++)
-		log_set_bit(lc, lc->clean_bits, *region);
+	for (; count > 0; count--, region++) {
+		r = clear_region(lc, *region, tfr->originator);
+		if (r)
+			return r;
+	}
+
 	tfr->data_size = 0;
 
 	return 0;
@@ -918,6 +1094,28 @@
 		return 0;
 	}
 
+	while (lc->recovery_request_list) {
+		struct recovery_request *del;
+
+		del = lc->recovery_request_list;
+		lc->recovery_request_list = del->next;
+
+		pkg->r = del->region;
+		free(del);
+
+		if (!log_test_bit(lc->sync_bits, pkg->r)) {
+			LOG_DBG("Assigning priority resync work to %u: %llu",
+				tfr->originator, pkg->r);
+#ifdef DEBUG
+			LOG_DBG("Priority work remaining:");
+			for (del = lc->recovery_request_list; del; del = del->next)
+				LOG_DBG("  %llu", del->region);
+#endif			
+			pkg->i = 1;
+			return 0;
+		}
+	}
+
 	pkg->r = find_next_zero_bit(lc->sync_bits,
 				    lc->region_count,
 				    lc->sync_search);
@@ -973,7 +1171,6 @@
 		return -EINVAL;
 
 	*sync_count = lc->sync_count;
-	LOG_DBG("sync_count = %llu", *sync_count);
 
 	tfr->data_size = sizeof(*sync_count);
 
@@ -1100,6 +1297,20 @@
 		return -EINVAL;
 
 	*rtn = !log_test_bit(lc->sync_bits, region);
+	if (*rtn) {
+		struct recovery_request *rr;
+		LOG_DBG("  Region is busy recovering: %llu", region);
+
+		/* Failure to allocated simply means we can't prioritize it */
+		rr = malloc(sizeof(*rr));
+		if (rr) {
+			rr->region = region;
+			rr->next = lc->recovery_request_list;
+			lc->recovery_request_list = rr;
+		}
+	} else
+		LOG_DBG("  Region is not recovering: %llu", region);
+
 	tfr->data_size = sizeof(*rtn);
 
 	return 0;	
@@ -1228,7 +1439,7 @@
 		goto out;
 	}
 
-	if (!lc->touched)
+	if (!(lc->touched & LOG_FLUSH))
 		goto out;
 
 	if (lc->disk_fd >= 0) {
@@ -1240,7 +1451,10 @@
 		LOG_DBG("Disk log written");
 	}
 
-	lc->touched = 0;
+	if (lc->touched & LOG_CHANGED)
+		LOG_ERROR("WARNING:  Log has changed during a flush operation");
+
+	lc->touched &= ~LOG_FLUSH;
 
 	/* FIXME: unlock */
 out:
@@ -1292,11 +1506,11 @@
 
 	if (!strncmp(which, "sync_bits", 9)) {
 		memcpy(*buf, lc->sync_bits, bitset_size);
-		LOG_PRINT("storing sync_bits:");
+		LOG_DBG("storing sync_bits:");
 		print_bits(*buf, bitset_size);
 	} else if (!strncmp(which, "clean_bits", 9)) {
 		memcpy(*buf, lc->clean_bits, bitset_size);
-		LOG_PRINT("storing clean_bits:");
+		LOG_DBG("storing clean_bits:");
 		print_bits(*buf, bitset_size);
 	}
 
--- cluster/cmirror/src/Attic/local.c	2007/11/05 22:44:03	1.1.2.3
+++ cluster/cmirror/src/Attic/local.c	2007/11/08 22:16:53	1.1.2.4
@@ -30,12 +30,14 @@
 
 	r = recv(cn_fd, buf, sizeof(buf), 0);
 	if (r < 0) {
+		LOG_ERROR("Failed to recv message from kernel");
 		r = -errno;
 		goto out;
 	}
 
 	switch (((struct nlmsghdr *)buf)->nlmsg_type) {
 	case NLMSG_ERROR:
+		LOG_ERROR("Unable to recv message from kernel: NLMSG_ERROR");
 		r = -EBADE;
 		break;
 	case NLMSG_DONE:
@@ -77,8 +79,10 @@
 	 * A failure to allocate space means the request is lost
 	 * The kernel must retry
 	 */
-	if (!(*tfr = queue_remove(free_queue)))
+	if (!(*tfr = queue_remove(free_queue))) {
+		LOG_ERROR("Failed to get clog_tfr from free_queue");
 		return -ENOMEM;
+	}
 
 	memset(*tfr, 0, sizeof(struct clog_tfr));
 
--- cluster/cmirror-kernel/src/dm-clog-tfr.c	2007/08/30 15:49:32	1.1.2.2
+++ cluster/cmirror-kernel/src/dm-clog-tfr.c	2007/11/08 22:16:53	1.1.2.3
@@ -81,6 +81,11 @@
 	}
 
 	list_for_each_entry(pkg, &recieving_list, list) {
+		/*
+		DMINFO("Msg from userspace recieved [%s].", RQ_TYPE(tfr->request_type));
+		DMINFO("  Seq # recieved: %llu    Seq # wanted: %llu",
+		       pkg->seq, tfr->seq);
+		*/
 		if (tfr->seq == pkg->seq) {
 			if (tfr->data_size > *(pkg->data_size)) {
 				DMERR("Insufficient space to recieve package [%s]",
--- cluster/cmirror-kernel/src/dm-clog-tfr.h	2007/11/03 18:37:48	1.1.2.2
+++ cluster/cmirror-kernel/src/dm-clog-tfr.h	2007/11/08 22:16:53	1.1.2.3
@@ -63,7 +63,7 @@
 };
 
 #ifdef __KERNEL__
-#define DM_MSG_PREFIX "clulog"
+#define DM_MSG_PREFIX "dm-log-clustered"
 
 int dm_clog_tfr_init(void);
 void dm_clog_tfr_exit(void);




More information about the Cluster-devel mailing list