[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] cluster/cmirror/src cluster.c functions.c func ...



CVSROOT:	/cvs/cluster
Module name:	cluster
Branch: 	RHEL5
Changes by:	jbrassow sourceware org	2007-11-05 22:44:04

Modified files:
	cmirror/src    : cluster.c functions.c functions.h link_mon.c 
	                 local.c 

Log message:
	- Fix problem with recovery work assignment (still need to add
	priority recovery... otherwise, I/O will stall for long periods
	during mirror resync)
	
	- Clean-up checkpointing code and fix a couple bugs there that
	prevented proper start-up.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/link_mon.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3

--- cluster/cmirror/src/Attic/cluster.c	2007/11/03 18:53:03	1.1.2.3
+++ cluster/cmirror/src/Attic/cluster.c	2007/11/05 22:44:03	1.1.2.4
@@ -49,6 +49,8 @@
 	int valid;
 	struct queue *startup_queue;
 
+	int checkpoints_needed;
+	uint32_t checkpoint_requesters[10];
 	struct checkpoint_data *checkpoint_list;
 };
 
@@ -384,7 +386,7 @@
 	return 0;
 }
 
-static int import_checkpoint(struct clog_cpg *entry)
+static int import_checkpoint(struct clog_cpg *entry, int no_read)
 {
 	int rtn = 0;
 	SaCkptCheckpointHandleT h;
@@ -422,6 +424,11 @@
 
 	saCkptCheckpointUnlink(ckpt_handle, &name);
 
+	if (no_read) {
+		LOG_DBG("Checkpoint for this log already recieved");
+		goto no_read;
+	}
+
 init_retry:
 	rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
@@ -516,7 +523,7 @@
 
 fail:
 	saCkptSectionIterationFinalize(itr);
-
+no_read:
 	saCkptCheckpointClose(h);
 
 	free(bitmap);
@@ -542,11 +549,18 @@
 			 * notice in tfr in export_checkpoint function
 			 * by setting tfr->error
 			 */
-			export_checkpoint(cp);
-
-			entry->checkpoint_list = cp->next;
-			free_checkpoint(cp);
-			cp = entry->checkpoint_list;
+			switch (export_checkpoint(cp)) {
+			case -EEXIST:
+				LOG_DBG("Checkpoint already handled by someone else");
+			case 0:
+				entry->checkpoint_list = cp->next;
+				free_checkpoint(cp);
+				cp = entry->checkpoint_list;
+				break;
+			default:
+				/* FIXME: Skipping will cause list corruption */
+				LOG_ERROR("Failed to export checkpoint");
+			}
 		}
 	}
 	EXIT();
@@ -556,7 +570,9 @@
 static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 				 uint32_t nodeid, uint32_t pid, void *msg, int msg_len)
 {
+	int i;
 	int r = 0;
+	int i_am_server;
 	struct clog_tfr *tfr = msg;
 	struct clog_tfr *startup_tfr = NULL;
 	struct clog_tfr *cp_tfr = NULL;
@@ -567,9 +583,6 @@
 	if (msg_len != (sizeof(*tfr) + tfr->data_size))
 		LOG_ERROR("Badly sized message recieved from cluster.");
 
-	LOG_DBG("Message (len = %d) from node/pid %u/%d", msg_len,
-		  nodeid, pid);
-
 	if (tfr->request_type & DM_CLOG_RESPONSE)
 		LOG_DBG("Response from cluster recieved %s",
 			RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE));
@@ -587,13 +600,31 @@
 		LOG_ERROR("Unable to find clog_cpg for cluster message");
 		goto out;
 	}
+	i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
 
 	if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
-		if ((!match->valid) && (my_cluster_id == tfr->originator)) {
-			switch (import_checkpoint(match)) {
+		/* Redundant checkpoints ignored due to match->valid */
+		if (my_cluster_id == tfr->originator) {
+			switch (import_checkpoint(match, match->valid)) {
 			case 0:
-				LOG_DBG("Checkpoint data recieved.  Log is now valid");
-				match->valid = 1;
+				if (!match->valid) {
+					LOG_DBG("Checkpoint data recieved.  Log is now valid");
+					match->valid = 1;
+					while ((startup_tfr = queue_remove(match->startup_queue))) {
+						LOG_DBG("Processing delayed request %d: %s",
+							match->startup_queue->count,
+							RQ_TYPE(startup_tfr->request_type));
+						r = handle_cluster_request(startup_tfr, i_am_server);
+
+						if (r) {
+							LOG_ERROR("Error while processing delayed CPG message");
+							goto out;
+						} else {
+							queue_add(startup_tfr, free_queue);
+						}
+					}
+				}
+
 				break;
 			case -EAGAIN:
 				LOG_PRINT("Checkpoint data empty.  Requesting new checkpoint.");
@@ -628,28 +659,28 @@
 		goto out;
 	}
 
-	if (tfr->request_type == DM_CLOG_CHECKPOINT_REQUEST) {
-		if (tfr->originator == my_cluster_id) {
-			/*
-			 * The checkpoint includes any request up to the
-			 * request for checkpoint.  So, we must clear any
-			 * previous requests we were storing.
-			 */
-			while ((startup_tfr = queue_remove(match->startup_queue)))
-				queue_add(startup_tfr, free_queue);
-		} else if (my_cluster_id == match->lowest_id) {
-			struct checkpoint_data *new;
-
-			new = prepare_checkpoint(match, tfr->originator);
-			if (!new) {
-				/* FIXME: Need better error handling */
-				LOG_ERROR("Failed to prepare checkpoint!!!");
-				goto out;
-			}
-			new->next = match->checkpoint_list;
-			match->checkpoint_list = new;
+	/*
+	 * If the log is now valid, we can queue the checkpoints
+	 */
+	for (i = match->checkpoints_needed; i;) {
+		struct checkpoint_data *new;
+
+		i--;
+		if (log_get_state(tfr) != LOG_RESUMED) {
+			LOG_DBG("Skipping checkpoint for %u, my log is not ready",
+				match->checkpoint_requesters[i]);
+			continue;
 		}
-		goto out;
+		new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
+		if (!new) {
+			/* FIXME: Need better error handling */
+			LOG_ERROR("Failed to prepare checkpoint for %u!!!",
+				  match->checkpoint_requesters[i]);
+			break;
+		}
+		match->checkpoints_needed = i;
+		new->next = match->checkpoint_list;
+		match->checkpoint_list = new;
 	}
 
 	if (tfr->request_type & DM_CLOG_RESPONSE)
@@ -665,21 +696,7 @@
 			goto out;
 		}
 
-		while ((startup_tfr = queue_remove(match->startup_queue))) {
-			LOG_DBG("Processing delayed request %d: %s",
-				match->startup_queue->count,
-				RQ_TYPE(startup_tfr->request_type));
-			r = handle_cluster_request(startup_tfr,
-						   (my_cluster_id == match->lowest_id) ? 1 : 0);
-			if (r) {
-				LOG_ERROR("Error while processing delayed CPG message");
-				goto out;
-			} else {
-				queue_add(startup_tfr, free_queue);
-			}
-		}
-
-		r = handle_cluster_request(tfr, (my_cluster_id == match->lowest_id) ? 1 : 0);
+		r = handle_cluster_request(tfr, i_am_server);
 	}
 
 out:
@@ -694,16 +711,14 @@
 				struct cpg_address *left_list, int left_list_entries,
 				struct cpg_address *joined_list, int joined_list_entries)
 {
-	int i;
+	int i, j;
 	int my_pid = getpid();
 	int found = 0;
-	int i_was_server = 0;
-	int do_checkpoint = 0;
 	struct clog_cpg *match, *tmp;
 
 	ENTER();
 
-	LOG_PRINT("* CPG config callback *********************");
+	LOG_PRINT("****** CPG config callback ****************");
 
 	LOG_PRINT("* JOINING (%d):", joined_list_entries);
 	for (i = 0; i < joined_list_entries; i++)
@@ -751,6 +766,7 @@
 		goto out;
 	}
 
+	/* Assign my_cluster_id */
 	if (my_cluster_id == 0xDEAD) {
 		for (i = 0; i < joined_list_entries; i++) {
 			LOG_DBG("My pid = %d\t\t[%u/%d]", my_pid,
@@ -764,42 +780,30 @@
 				LOG_PRINT("Setting my cluster id: %u", my_cluster_id);
 			}
 		}
-	} else if (match->lowest_id == my_cluster_id) {
-		LOG_DBG("I was the server (lowest_id = %u, my_cluster_id = %u)",
-			  match->lowest_id, my_cluster_id);
-		i_was_server = 1;
+		goto out;
 	}
 
+	/* Find the lowest_id, i.e. the server */
 	for (i = 0, match->lowest_id = member_list[0].nodeid;
 	     i < member_list_entries; i++)
 		if (match->lowest_id > member_list[i].nodeid)
 			match->lowest_id = member_list[i].nodeid;
 
-	if (match->lowest_id == my_cluster_id) {
-		/* I am the server now */
-		if (!i_was_server)
-			LOG_PRINT("I am the new log server for %s", match->name.value);
-		else if (joined_list_entries) {
-			LOG_PRINT("I must send checkpoint data.");
-			do_checkpoint = 1;
-		}
-	} else if (i_was_server && joined_list_entries) {
-		LOG_PRINT("Giving server ownership to %u", match->lowest_id);
-		LOG_PRINT("I must send checkpoint data.");
-		do_checkpoint = 1;
-	}
+	LOG_PRINT("Server is now %u", match->lowest_id);
 
-	if (do_checkpoint) {
-		struct checkpoint_data *new;
+	/*
+	 * If I am part of the joining list, I do not send checkpoints
+	 * FIXME: What are the cases where multiple nodes can join?
+	 */
+	for (i = 0; i < joined_list_entries; i++)
+		if (joined_list[i].nodeid == my_cluster_id)
+			goto out;
 
-		for (i = 0; i < joined_list_entries; i++) {
-			new = prepare_checkpoint(match, joined_list[i].nodeid);
-			if (!new)
-				goto out;
-			new->next = match->checkpoint_list;
-			match->checkpoint_list = new;
-		}
+	for (i = 0, j = match->checkpoints_needed; i < joined_list_entries; i++) {
+		LOG_DBG("Joining node, %u needs checkpoint", joined_list[i].nodeid);
+		match->checkpoint_requesters[i + j] = joined_list[i].nodeid;
 	}
+	match->checkpoints_needed += i;
 
 out:
 	EXIT();
--- cluster/cmirror/src/Attic/functions.c	2007/11/03 18:37:48	1.1.2.2
+++ cluster/cmirror/src/Attic/functions.c	2007/11/05 22:44:03	1.1.2.3
@@ -56,6 +56,8 @@
                 FORCESYNC,      /* Force a sync to happen */
         } sync;
 
+	uint32_t state;         /* current operational state of the log */
+
 	int disk_fd;            /* -1 means no disk log */
 	int log_dev_failed;
 	uint64_t disk_nr_regions;
@@ -563,6 +565,8 @@
 	if (lc->touched)
 		LOG_DBG("WARNING: log still marked as 'touched' during suspend");
 
+	lc->state = LOG_SUSPENDED;
+
 	return 0;
 }
 
@@ -583,12 +587,12 @@
 }
 
 /*
- * _clog_resume
+ * clog_resume
  * @tfr
  *
  * Does the main work of resuming.
  */
-static int _clog_resume(struct clog_tfr *tfr)
+static int clog_resume(struct clog_tfr *tfr)
 {
 	uint32_t i;
 	struct log_c *lc = get_log(tfr->uuid);
@@ -617,6 +621,9 @@
 	case 3:
 		LOG_DBG("Non-master resume: bits pre-loaded");
 		lc->resume_override = 1000;
+		lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+		LOG_DBG("sync_count = %llu", lc->sync_count);
+		goto out;
 		return 0;
 	default:
 		LOG_ERROR("Error:: multiple loading of bits (%d)", lc->resume_override);
@@ -662,36 +669,29 @@
 	/* copy clean across to sync */
 	memcpy(lc->sync_bits, lc->clean_bits, size);
 	lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+	LOG_DBG("sync_count = %llu", lc->sync_count);
 	lc->sync_search = 0;
 
 	/*
-	tfr->error = write_log(lc);
-	if (tfr->error) {
-		lc->log_dev_failed = 1;
-		LOG_ERROR("Failed to write initial disk log");
-	} else
-		lc->log_dev_failed = 0;
-	*/
-	/*
 	 * We mark 'touched' so that only the master commits
 	 * the log via 'commit_log'
 	 */
 	lc->touched = 1;
-
+out:
+	lc->state = LOG_RESUMED;
+	
 	return tfr->error;
 }
 
 /*
- * clog_resume
+ * local_resume
  * @tfr
  *
  * If the log is pending, we must first join the cpg and
  * put the log in the official list.
  *
- * If the log is in the official list, then we call
- * _clog_resume.
  */
-static int clog_resume(struct clog_tfr *tfr)
+int local_resume(struct clog_tfr *tfr)
 {
 	int r;
 	struct log_c *lc = get_log(tfr->uuid);
@@ -714,12 +714,9 @@
 		/* move log to official list */
 		list_del_init(&lc->list);
 		list_add(&lc->list, &log_list);
-
-		return 0;
 	}
 
-	/* log is in the official list, try to resume */
-	return _clog_resume(tfr);
+	return 0;
 }
 
 /*
@@ -896,7 +893,6 @@
  */
 static int clog_get_resync_work(struct clog_tfr *tfr)
 {
-	int sync_search, conflict=0;
 	struct {int i; uint64_t r; } *pkg = (void *)tfr->data;
 	struct log_c *lc = get_log(tfr->uuid);
 
@@ -905,42 +901,35 @@
 
 	tfr->data_size = sizeof(*pkg);
 
-	/*
-	 * Check if we are done recovering, or if there
-	 * is already someone recovering.
-	 */
-	if ((lc->sync_search >= lc->region_count) ||
-	    (lc->recovering_region != (uint64_t)-1)) {
+	if (lc->sync_search >= lc->region_count) {
+		/*
+		 * FIXME: handle intermittent errors during recovery
+		 * by resetting sync_search... but not to many times.
+		 */
+		LOG_DBG("  Recovery has finished");
 		pkg->i = 0;
 		return 0;
 	}
 
-	for (sync_search = lc->sync_search;
-	     sync_search < lc->region_count;
-	     sync_search += (pkg->r + 1)) {
-		pkg->r = find_next_zero_bit(lc->sync_bits,
-					    lc->region_count,
-					    sync_search);
-
-		/*
-		 * If the region is currently marked, we cannot
-		 * recover it yet.
-		 */
-		if (!log_test_bit(lc->clean_bits, pkg->r))
-			conflict = 1;
-		else
-			break;
+	if (lc->recovering_region != (uint64_t)-1) {
+		LOG_DBG("Someone is already recovering region %Lu",
+			lc->recovering_region);
+		pkg->i = 0;
+		return 0;
 	}
 
-	if (!conflict)
-		lc->sync_search = pkg->r + 1;
+	pkg->r = find_next_zero_bit(lc->sync_bits,
+				    lc->region_count,
+				    lc->sync_search);
 
 	if (pkg->r >= lc->region_count) {
 		pkg->i = 0;
 		return 0;
 	}
 
-	LOG_DBG("Assigning resync work: region = %llu\n", pkg->r);
+	lc->sync_search = pkg->r + 1;
+
+	LOG_DBG("  Assigning resync work: region = %llu\n", pkg->r);
 	pkg->i = 1;
 	return 0;
 }
@@ -984,6 +973,8 @@
 		return -EINVAL;
 
 	*sync_count = lc->sync_count;
+	LOG_DBG("sync_count = %llu", *sync_count);
+
 	tfr->data_size = sizeof(*sync_count);
 
 	return 0;
@@ -1347,6 +1338,17 @@
 	return 0;
 }
 
+int log_get_state(struct clog_tfr *tfr)
+{
+	struct log_c *lc;
+
+	lc = get_log(tfr->uuid);
+	if (!lc)
+		return -EINVAL;
+
+	return lc->state;
+}
+
 int log_status(void)
 {
 	int found = 0;
@@ -1380,3 +1382,4 @@
 	}
 	return found;
 }
+
--- cluster/cmirror/src/Attic/functions.h	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/functions.h	2007/11/05 22:44:03	1.1.2.2
@@ -3,9 +3,14 @@
 
 #include <linux/dm-clog-tfr.h>
 
+#define LOG_RESUMED   1
+#define LOG_SUSPENDED 2
+
+int local_resume(struct clog_tfr *tfr);
 int do_request(struct clog_tfr *tfr);
 int commit_log(struct clog_tfr *tfr);
 int store_bits(const char *uuid, const char *which, char **buf);
 int load_bits(const char *uuid, const char *which, char *buf, int size);
+int log_get_state(struct clog_tfr *tfr);
 int log_status(void);
 #endif /* __CLOG_FUNCTIONS_DOT_H__ */
--- cluster/cmirror/src/Attic/link_mon.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/link_mon.c	2007/11/05 22:44:03	1.1.2.2
@@ -102,8 +102,6 @@
 	/* FIXME: handle POLLHUP */
 	for (i = 0; i < used_pfds; i++)
 		if (pfds[i].revents & POLLIN) {
-			LOG_DBG("Data waiting on file descriptor, %d.",
-				pfds[i].fd);
 			/* FIXME: Add this back return 1;*/
 			r++;
 		}
--- cluster/cmirror/src/Attic/local.c	2007/11/03 18:37:48	1.1.2.2
+++ cluster/cmirror/src/Attic/local.c	2007/11/05 22:44:03	1.1.2.3
@@ -170,7 +170,7 @@
 		 * component to join the CPG, and a cluster component
 		 * to handle the request.
 		 */
-		r = do_request(tfr);
+		r = local_resume(tfr);
 		if (r) {
 			LOG_DBG("Returning failed request to kernel [%s]",
 				RQ_TYPE(tfr->request_type));


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]