[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] cluster/cmirror/src cluster.c cluster.h functi ...



CVSROOT:	/cvs/cluster
Module name:	cluster
Branch: 	RHEL5
Changes by:	jbrassow sourceware org	2008-02-08 14:30:10

Modified files:
	cmirror/src    : cluster.c cluster.h functions.c functions.h 
	                 local.c 

Log message:
	- stop delaying disk log writes
	- stop placing requests into the startup queue before initial config
	- added recovering_region to checkpoint data to prevent duplicate region
	syncing assignment.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.15&r2=1.1.2.16
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.4&r2=1.1.2.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14

--- cluster/cmirror/src/Attic/cluster.c	2008/02/06 23:03:05	1.1.2.15
+++ cluster/cmirror/src/Attic/cluster.c	2008/02/08 14:30:10	1.1.2.16
@@ -27,6 +27,13 @@
 static SaCkptCallbacksT callbacks = { 0, 0 };
 static SaVersionT version = { 'B', 1, 1 };
 
+#define DEBUGGING_HISTORY 20
+static char debugging[DEBUGGING_HISTORY][128];
+static int idx = 0;
+static int memberz = 0;
+static int doit = 0;
+
+
 struct checkpoint_data {
 	uint32_t requester;
 	char uuid[CPG_MAX_NAME_LENGTH];
@@ -34,6 +41,7 @@
 	int bitmap_size; /* in bytes */
 	char *sync_bits;
 	char *clean_bits;
+	char *recovering_region;
 	struct checkpoint_data *next;
 };	
 
@@ -58,44 +66,18 @@
 static struct list_head clog_cpg_list;
 
 /*
- * flow_control
- * @handle
- *
- * Returns: 1 if flow control needed, 0 otherwise
- */
-static int flow_control(cpg_handle_t handle)
-{
-	cpg_flow_control_state_t flow_control_state;
-	cpg_error_t error;
-	
-	/* FIXME: no flow control for now (cmirror should self regulate) */
-	return 0;
-
-	error = cpg_flow_control_state_get(handle, &flow_control_state);
-	if (error != CPG_OK) {
-		LOG_ERROR("Failed to get flow control state.  Reason: %d", error);
-		/* FIXME: Better error handling */
-		return 0;
-	}
-
-	return (flow_control_state == CPG_FLOW_CONTROL_ENABLED) ? 1 : 0;
-}
-
-/*
  * cluster_send
  * @tfr
  *
  * Returns: 0 on success, -Exxx on error
  */
-int cluster_send(struct clog_tfr *tfr)
+static int cluster_send(struct clog_tfr *tfr)
 {
 	int r;
 	int found;
 	struct iovec iov;
 	struct clog_cpg *entry, *tmp;
 
-	ENTER();
-	
 	list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list)
 		if (!strncmp(entry->name.value, tfr->uuid, CPG_MAX_NAME_LENGTH)) {
 			found = 1;
@@ -104,26 +86,35 @@
 
 	if (!found) {
 		tfr->error = -ENOENT;
-		EXIT();
 		return -ENOENT;
 	}
 
 	iov.iov_base = tfr;
 	iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
-	while (flow_control(entry->handle)) {
-		/*
-		 * FIXME: Don't need to sleep this long
-		 *
-		 * ... or, we could dispatch the queued messages here.
-		 */
-		LOG_PRINT("Flow control enabled.  Delaying msg [%s]",
-			  RQ_TYPE(tfr->request_type));
-		sleep(1);
-	}
+
 	r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
+	if (r == CPG_OK)
+		return 0;
+	if (r == SA_AIS_ERR_TRY_AGAIN)
+		return -EAGAIN;
+
+	LOG_ERROR("cpg_mcast_joined error: %d", r);
+
+	tfr->error = -EBADE;
+	return -EBADE;
+}
+
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function)
+{
+	int r;
+
+	do {
+		r = cluster_send(tfr);
+		if (r)
+			LOG_ERROR("cluster_send failed at: %s:%d (%s)",
+				  file, line, function);
+	} while (r == -EAGAIN);
 
-	EXIT();
-	tfr->error = r = (r == CPG_OK) ? 0 : -EBADE;
 	return r;
 }
 
@@ -137,7 +128,7 @@
 	return r;
 }
 
-static int handle_cluster_request(struct clog_tfr *tfr, int server)
+static int handle_cluster_request(struct clog_tfr *tfr, int server, int printz)
 {
 	int r = 0;
 
@@ -152,27 +143,22 @@
 	 */
 	if ((tfr->request_type != DM_CLOG_RESUME) ||
 	    (tfr->originator == my_cluster_id))
-		r = do_request(tfr);
+		r = do_request(tfr, server);
 
 	if (server) {
-		if (r)
-			LOG_ERROR("do_request failed, unable to commit log");
-		else
-			r = commit_log(tfr);
-
 		tfr->request_type |= DM_CLOG_RESPONSE;
 
 		/*
 		 * Errors from previous functions are in the tfr struct.
 		 */
-
-		LOG_DBG("Sending response to %u on cluster: [%s/%llu]",
-			tfr->originator,
-			RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
-			(unsigned long long)tfr->seq);
+		if (printz)
+			LOG_DBG("[%s] Sending response to %u on cluster: [%s/%llu]",
+				SHORT_UUID(tfr->uuid), tfr->originator,
+				RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+				(unsigned long long)tfr->seq);
 		r = cluster_send(tfr);
 		if (r)
-			LOG_ERROR("cluster_send failed");
+			LOG_ERROR("cluster_send failed: %s", strerror(-r));
 	}
 
 	EXIT();
@@ -209,6 +195,8 @@
 		INIT_LIST_HEAD(&l);
 		queue_remove_all(&l, cluster_queue);
 		LOG_ERROR("Current list:");
+		if (list_empty(&l))
+			LOG_ERROR("   [none]");
 		list_for_each_safe(p, n, &l) {
 			list_del_init(p);
 			t = (struct clog_tfr *)p;
@@ -257,6 +245,7 @@
 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
 						  uint32_t cp_requester)
 {
+	int r;
 	struct checkpoint_data *new;
 
 	new = malloc(sizeof(*new));
@@ -270,7 +259,7 @@
 	strncpy(new->uuid, entry->name.value, entry->name.length);
 
 	if (entry->valid) {
-		new->bitmap_size = store_bits(entry->name.value, "clean_bits",
+		new->bitmap_size = push_state(entry->name.value, "clean_bits",
 					      &new->clean_bits);
 		if (new->bitmap_size <= 0) {
 			LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
@@ -279,7 +268,7 @@
 			return NULL;
 		}
 
-		new->bitmap_size = store_bits(entry->name.value,
+		new->bitmap_size = push_state(entry->name.value,
 					      "sync_bits", &new->sync_bits);
 		if (new->bitmap_size <= 0) {
 			LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
@@ -288,6 +277,16 @@
 			free(new);
 			return NULL;
 		}
+
+		r = push_state(entry->name.value, "recovering_region", &new->recovering_region);
+		if (r <= 0) {
+			LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
+				  new->requester);
+			free(new->sync_bits);
+			free(new->clean_bits);
+			free(new);
+			return NULL;
+		}
 	} else {
 		/*
 		 * We can store bitmaps yet, because the log is not
@@ -309,6 +308,7 @@
  */
 static void free_checkpoint(struct checkpoint_data *cp)
 {
+	free(cp->recovering_region);
 	free(cp->sync_bits);
 	free(cp->clean_bits);
 	free(cp);
@@ -335,9 +335,9 @@
 	name.length = len;
 
 	attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
-	attr.checkpointSize = cp->bitmap_size * 2;
+	attr.checkpointSize = cp->bitmap_size * 2 + strlen(cp->recovering_region) + 1;
 	attr.retentionDuration = SA_TIME_MAX;
-	attr.maxSections = 3;      /* don't know why we need +1 */
+	attr.maxSections = 4;      /* don't know why we need +1 */
 	attr.maxSectionSize = cp->bitmap_size;
 	attr.maxSectionIdSize = 22;
 
@@ -363,6 +363,7 @@
 		EXIT();
 		return -EIO; /* FIXME: better error */
 	}
+
 	/*
 	 * Add section for sync_bits
 	 */
@@ -408,7 +409,7 @@
 	}
 
 	if (rv == SA_AIS_ERR_EXIST) {
-		LOG_ERROR("export_checkpoint: clean checkpoint section already exists");
+		LOG_DBG("export_checkpoint: clean checkpoint section already exists");
 		EXIT();
 		return -EEXIST;
 	}
@@ -419,6 +420,35 @@
 		return -EIO; /* FIXME: better error */
 	}
 
+	/*
+	 * Add section for recovering_region
+	 */
+	section_id.idLen = snprintf(buf, 32, "recovering_region");
+	section_id.id = (unsigned char *)buf;
+	section_attr.sectionId = &section_id;
+	section_attr.expirationTime = SA_TIME_END;
+
+rr_create_retry:
+	rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
+				 strlen(cp->recovering_region) + 1);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		LOG_ERROR("export_checkpoint: RR create retry");
+		sleep(1);
+		goto rr_create_retry;
+	}
+
+	if (rv == SA_AIS_ERR_EXIST) {
+		LOG_DBG("export_checkpoint: RR checkpoint section already exists");
+		EXIT();
+		return -EEXIST;
+	}
+
+	if (rv != SA_AIS_OK) {
+		LOG_ERROR("export_checkpoint: RR checkpoint section creation failed");
+		EXIT();
+		return -EIO; /* FIXME: better error */
+	}
+
 	LOG_DBG("export_checkpoint: closing checkpoint");
 	saCkptCheckpointClose(h);
 
@@ -515,7 +545,7 @@
 			break;
 	}
 	saCkptSectionIterationFinalize(itr);
-	if (len != 2) {
+	if (len != 3) {
 		LOG_ERROR("import_checkpoint: %d checkpoint sections found", len);
 		sleep(1);
 		goto init_retry;
@@ -572,8 +602,9 @@
 		*/
 
 		if (iov.readSize) {
-			if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) {
-				LOG_ERROR("Error loading bits");
+			if (pull_state(entry->name.value, (char *)desc.sectionId.id, bitmap,
+				       iov.readSize)) {
+				LOG_ERROR("Error loading state");
 				rtn = -EIO;
 				goto fail;
 			}
@@ -645,6 +676,7 @@
 	int i;
 	int r = 0;
 	int i_am_server;
+	int response = 0;
 	struct clog_tfr *tfr = msg;
 	struct clog_tfr *startup_tfr = NULL;
 	struct clog_cpg *match;
@@ -665,7 +697,9 @@
 			(unsigned long long)tfr->seq);
 
 	if (my_cluster_id == 0xDEAD) {
-		LOG_DBG("Message before init... ignoring.\n");
+		LOG_DBG("[%s]  Message from %u before init [%s/%llu]",
+			SHORT_UUID(tfr->uuid), nodeid,
+			RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq);
 		return;
 	}
 
@@ -674,6 +708,14 @@
 		LOG_ERROR("Unable to find clog_cpg for cluster message");
 		return;
 	}
+
+	if (match->lowest_id == 0xDEAD) {
+		LOG_DBG("[%s]  Message from %u before init* [%s/%llu]",
+			SHORT_UUID(tfr->uuid), nodeid,
+			RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq);
+		return;
+	}
+
 	i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
 
 	if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
@@ -691,7 +733,7 @@
 					LOG_DBG("Processing delayed request %d: %s",
 						match->startup_queue->count,
 						RQ_TYPE(startup_tfr->request_type));
-					r = handle_cluster_request(startup_tfr, i_am_server);
+					r = handle_cluster_request(startup_tfr, i_am_server, 1);
 
 					if (r) {
 						LOG_ERROR("Error while processing delayed CPG message");
@@ -732,9 +774,10 @@
 		match->checkpoint_list = new;
 	}
 
-	if (tfr->request_type & DM_CLOG_RESPONSE)
+	if (tfr->request_type & DM_CLOG_RESPONSE) {
+		response = 1;
 		r = handle_cluster_response(tfr);
-	else {
+	} else {
 		tfr->originator = nodeid;
 
 		if (!match->valid) {
@@ -757,15 +800,40 @@
 			goto out;
 		}
 
-		r = handle_cluster_request(tfr, i_am_server);
+		r = handle_cluster_request(tfr, i_am_server,
+					   ((memberz != 4) || (--doit > 0)));
 	}
 
 out:
 	if (r) {
-		LOG_ERROR("[%s] Error while processing CPG message, %s: %d",
+		LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
 			  SHORT_UUID(tfr->uuid),
 			  RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
-			  r);
+			  strerror(-r));
+		LOG_ERROR("[%s]    Response  : %s", SHORT_UUID(tfr->uuid),
+			  (response) ? "YES" : "NO");
+		LOG_ERROR("[%s]    Originator: %u", SHORT_UUID(tfr->uuid), tfr->originator);
+		if (response)
+			LOG_ERROR("[%s]    Responder : %u", SHORT_UUID(tfr->uuid), nodeid);
+		LOG_ERROR("HISTORY::");
+
+		for (i = 0; i < DEBUGGING_HISTORY; i++) {
+			idx++;
+			idx = idx % DEBUGGING_HISTORY;
+			if (debugging[idx][0] == '\0')
+				continue;
+			LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]);
+		}
+	} else if (!(tfr->request_type & DM_CLOG_RESPONSE)) {
+		int len;
+		idx++;
+		idx = idx % DEBUGGING_HISTORY;
+		len = sprintf(debugging[idx], "SEQ#=%llu, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
+			      (unsigned long long)tfr->seq, SHORT_UUID(tfr->uuid),
+			      RQ_TYPE(tfr->request_type),
+			      tfr->originator, (response) ? "YES" : "NO");
+		if (response)
+			sprintf(debugging[idx] + len, ", RSPR=%u", nodeid);
 	}
 	EXIT();
 }
@@ -779,10 +847,12 @@
 	int my_pid = getpid();
 	int found = 0;
 	struct clog_cpg *match, *tmp;
-	uint32_t lowest;
+	uint32_t lowest = 0xDEAD;
 
 	ENTER();
 
+	memberz = member_list_entries;
+
 	LOG_DBG("****** CPG config callback **[%s]**",
 		SHORT_UUID(gname->value));
 
@@ -821,6 +891,8 @@
 		goto out;
 	}
 
+	lowest = match->lowest_id;
+
 	/* Am I leaving? */
 	for (i = 0; i < left_list_entries; i++)
 		if (my_cluster_id == left_list[i].nodeid) {
@@ -863,6 +935,7 @@
 
 			free(match->startup_queue);
 			match->free_me = 1;
+			match->lowest_id = 0xDEAD;
 
 			goto out;
 		}			
@@ -871,8 +944,6 @@
 	if (!left_list_entries &&
 	    (member_list_entries == 1) && (joined_list_entries == 1) &&
 	    (member_list[0].nodeid == joined_list[0].nodeid)) {
-		LOG_DBG("[%s]  I am the log server (and first to join)",
-			SHORT_UUID(match->name.value));
 		match->lowest_id = my_cluster_id = joined_list[0].nodeid;
 		match->valid = 1;
 		goto out;
@@ -894,17 +965,15 @@
 		}
 	}
 
-	lowest = match->lowest_id;
+	if (member_list_entries)
+		match->lowest_id = member_list[0].nodeid;
+	else
+		match->lowest_id = 0xDEAD;
 	/* Find the lowest_id, i.e. the server */
-	for (i = 0, match->lowest_id = member_list[0].nodeid;
-	     i < member_list_entries; i++)
+	for (i = 0; i < member_list_entries; i++)
 		if (match->lowest_id > member_list[i].nodeid)
 			match->lowest_id = member_list[i].nodeid;
 
-	if (lowest != match->lowest_id)
-		LOG_DBG("[%s]  Server is now %u", SHORT_UUID(match->name.value),
-			match->lowest_id);
-
 	/*
 	 * If I am part of the joining list, I do not send checkpoints
 	 * FIXME: What are the cases where multiple nodes can join?
@@ -920,6 +989,21 @@
 	match->checkpoints_needed += i;
 
 out:
+	if (lowest != match->lowest_id)
+		LOG_DBG("[%s]  Server change %u -> %u (%u %s)",
+			SHORT_UUID(match->name.value),
+			lowest, match->lowest_id,
+			(joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
+			(joined_list_entries && (member_list_entries == 1)) ? 
+			"is first to join" : (joined_list_entries) ? "joined" : "left");
+	else
+		LOG_DBG("[%s]  Server unchanged at %u (%u %s)",
+			SHORT_UUID(match->name.value), lowest,
+			(joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
+			(joined_list_entries) ? "joined" : "left");
+
+	if (joined_list_entries && (joined_list[0].nodeid == my_cluster_id))
+		doit = 25;
 	EXIT();
 }
 
@@ -1019,6 +1103,12 @@
 
 	ENTER();
 
+	{
+		int i;
+		for(i = 0; i < DEBUGGING_HISTORY; i++)
+			debugging[i][0] = '\0';
+	}
+
 	INIT_LIST_HEAD(&clog_cpg_list);
 	rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
 
--- cluster/cmirror/src/Attic/cluster.h	2008/01/23 21:21:06	1.1.2.2
+++ cluster/cmirror/src/Attic/cluster.h	2008/02/08 14:30:10	1.1.2.3
@@ -7,6 +7,7 @@
 int create_cluster_cpg(char *str);
 int destroy_cluster_cpg(char *str);
 
-int cluster_send(struct clog_tfr *tfr);
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function);
+#define cluster_send(x) cluster_send_helper((x), __LINE__, __FILE__, __FUNCTION__)
 
 #endif /* __CLUSTER_LOG_CLUSTER_DOT_H__ */
--- cluster/cmirror/src/Attic/functions.c	2008/02/06 23:03:05	1.1.2.13
+++ cluster/cmirror/src/Attic/functions.c	2008/02/08 14:30:10	1.1.2.14
@@ -33,14 +33,6 @@
         uint64_t nr_regions;
 };
 
-/*
- * Used by the 'touched' variable, these macros mean:
- *   LOG_CHANGED - bits in the in-memory log have changed
- *   LOG_FLUSH   - log must be committed to disk
- */
-#define LOG_CHANGED 1
-#define LOG_FLUSH   2
-
 struct log_c {
 	struct list_head list;
 
@@ -103,13 +95,13 @@
 static void log_set_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
 {
 	ext2fs_set_bit(bit, (unsigned int *) bs);
-	lc->touched |= LOG_CHANGED;
+	lc->touched = 1;
 }
 
 static void log_clear_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
 {
 	ext2fs_clear_bit(bit, (unsigned int *) bs);
-	lc->touched |= LOG_CHANGED;
+	lc->touched = 1;
 }
 
 /* FIXME: Why aren't count and start the same type? */
@@ -205,7 +197,7 @@
 		if (r < 0) {
 			LOG_ERROR("rw_log:  write failure: %s",
 				  strerror(errno));
-			return -EIO;
+			return -EIO; /* Failed disk write */
 		}
 		return 0;
 	}
@@ -216,7 +208,7 @@
 		LOG_ERROR("rw_log:  read failure: %s",
 			  strerror(errno));
 	if (r != lc->disk_size)
-		return -EIO;
+		return -EIO; /* Failed disk read */
 	return 0;
 }
 
@@ -239,7 +231,7 @@
 	memset(&lh, 0, sizeof(struct log_header));
 
 	if (rw_log(lc, 0))
-		return -EIO;
+		return -EIO; /* Failed disk read */
 
 	header_from_disk(&lh, lc->disk_buffer);
 	if (lh.magic != MIRROR_MAGIC) {
@@ -285,8 +277,10 @@
 	bitset_size += (lc->region_count % 8) ? 1 : 0;
 	memcpy(lc->disk_buffer + 1024, lc->sync_bits, bitset_size);
 
-	if (rw_log(lc, 1))
-		return -EIO;
+	if (rw_log(lc, 1)) {
+		lc->log_dev_failed = 1;
+		return -EIO; /* Failed disk write */
+	}
 	return 0;
 }
 
@@ -697,6 +691,7 @@
 static int clog_resume(struct clog_tfr *tfr)
 {
 	uint32_t i;
+	int commit_log = 0;
 	struct log_c *lc = get_log(tfr->uuid);
 	size_t size = lc->bitset_uint32_count * sizeof(uint32_t);
 
@@ -715,6 +710,7 @@
 		LOG_DBG("[%s] Master resume: reading disk log",
 			  SHORT_UUID(lc->uuid));
 		lc->resume_override = 1000;
+		commit_log = 1;
 		break;
 	case 1:
 		LOG_ERROR("Error:: partial bit loading (just sync_bits)");
@@ -782,11 +778,14 @@
 		SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count);
 	lc->sync_search = 0;
 
-	/*
-	 * We mark 'touched' as LOG_FLUSH so that only the master commits
-	 * the log via 'commit_log'
-	 */
-	lc->touched = LOG_FLUSH;
+	if (commit_log && (lc->disk_fd >= 0)) {
+		tfr->error = write_log(lc);
+		if (tfr->error)
+			LOG_ERROR("Failed initial disk log write");
+		else
+			LOG_DBG("Disk log initialized");
+		lc->touched = 0;
+	}
 out:
 	lc->state = LOG_RESUMED;
 	lc->recovery_halted = 0;
@@ -917,20 +916,34 @@
  * @tfr
  *
  */
-static int clog_flush(struct clog_tfr *tfr)
+static int clog_flush(struct clog_tfr *tfr, int server)
 {
+	int r = 0;
 	struct log_c *lc = get_log(tfr->uuid);
-
+	
 	if (!lc)
 		return -EINVAL;
 
-	/* 
-	 * Actual disk flush happens in 'commit_log()'
-	 * Clear LOG_CHANGED and set LOG_FLUSH
+	if (!lc->touched)
+		return 0;
+
+	/*
+	 * Do the actual flushing of the log only
+	 * if we are the server.
 	 */
-	lc->touched = LOG_FLUSH;
+	if (server && (lc->disk_fd >= 0)) {
+		r = tfr->error = write_log(lc);
+		if (r) {
+			LOG_ERROR("Error writing to disk log");
+			return r;
+		}
+		LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
+	}
+
+	lc->touched = 0;
 
 	return 0;
+
 }
 
 /*
@@ -1179,14 +1192,18 @@
 
 	if (pkg->in_sync) {
 		if (log_test_bit(lc->sync_bits, pkg->region)) {
-			LOG_PRINT("  Region already in-sync: %llu",
-				  (unsigned long long)pkg->region);
+			LOG_ERROR("[%s]  Region already in-sync: region=%llu, seq=%llu, who=%u",
+				  SHORT_UUID(lc->uuid),
+				  (unsigned long long)pkg->region,
+				  (unsigned long long)tfr->seq,
+				  tfr->originator);
 		} else {
 			log_set_bit(lc, lc->sync_bits, pkg->region);
 			lc->sync_count++;
-			LOG_DBG("[%s] sync_count = %llu, Region %llu marked in-sync by %u",
+			LOG_DBG("[%s] sync_count=%llu, Region %llu marked in-sync by %u, seq=%llu",
 				SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count,
-				(unsigned long long)pkg->region, tfr->originator);
+				(unsigned long long)pkg->region, tfr->originator,
+				(unsigned long long)tfr->seq);
 		}
 	} else if (log_test_bit(lc->sync_bits, pkg->region)) {
 		lc->sync_count--;
@@ -1249,7 +1266,7 @@
 
 	tfr->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
 				 major(statbuf.st_rdev), minor(statbuf.st_rdev),
-				 'A'); /* FIXME: detect dead device */
+				 (lc->log_dev_failed) ? 'D' : 'A');
 
 	return 0;
 }
@@ -1396,6 +1413,7 @@
 /*
  * do_request
  * @tfr: the request
+ * @server: is this request performed by the server
  *
  * An inability to perform this function will return an error
  * from this function.  However, an inability to successfully
@@ -1403,7 +1421,7 @@
  *
  * Returns: 0 on success, -EXXX on error
  */
-int do_request(struct clog_tfr *tfr)
+int do_request(struct clog_tfr *tfr, int server)
 {
 	int r;
 
@@ -1442,7 +1460,7 @@
 		r = clog_in_sync(tfr);
 		break;
 	case DM_CLOG_FLUSH:
-		r = clog_flush(tfr);
+		r = clog_flush(tfr, server);
 		break;
 	case DM_CLOG_MARK_REGION:
 		r = clog_mark_region(tfr);
@@ -1489,52 +1507,6 @@
 	return 0;
 }
 
-/*
- * commit_log
- * @tfr: commit log associated with this request
- *
- * This function will also set 'tfr->error' on failure
- *
- * Returns: 0 on success, -EXXX on error
- */
-int commit_log(struct clog_tfr *tfr)
-{
-	int r = 0;
-	struct log_c *lc;
-
-	ENTER();
-
-	lc = get_log(tfr->uuid);
-	
-	if (!lc) {
-		LOG_DBG("No log found");
-		tfr->error = -EINVAL;
-		r = -EINVAL;
-		goto out;
-	}
-
-	if (!(lc->touched & LOG_FLUSH))
-		goto out;
-
-	if (lc->disk_fd >= 0) {
-		r = tfr->error = write_log(lc);
-		if (r) {
-			LOG_ERROR("Error writing to disk log");
-			return -EIO;
-		}
-		LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
-	}
-
-	if (lc->touched & LOG_CHANGED)
-		LOG_ERROR("WARNING:  Log has changed during a flush operation");
-
-	lc->touched &= ~LOG_FLUSH;
-
-out:
-	EXIT();
-	return 0;
-}
-
 static void print_bits(char *buf, int size)
 {
 #ifdef DEBUG
@@ -1556,7 +1528,8 @@
 #endif
 }
 
-int store_bits(const char *uuid, const char *which, char **buf)
+/* int store_bits(const char *uuid, const char *which, char **buf)*/
+int push_state(const char *uuid, const char *which, char **buf)
 {
 	int bitset_size;
 	struct log_c *lc;
@@ -1570,8 +1543,18 @@
 		return -EINVAL;
 	}
 
+	if (!strcmp(which, "recovering_region")) {
+		*buf = malloc(32); /* easily covers largest 64-bit int */
+		if (!*buf)
+			return -ENOMEM;
+		sprintf(*buf, "%llu", (unsigned long long)lc->recovering_region);
+
+		return 32;
+	}
+
 	bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
 	*buf = malloc(bitset_size);
+
 	if (!*buf) {
 		LOG_ERROR("store_bits: Unable to allocate memory");
 		return -ENOMEM;
@@ -1590,23 +1573,33 @@
 	return bitset_size;
 }
 
-int load_bits(const char *uuid, const char *which, char *buf, int size)
+/*int load_bits(const char *uuid, const char *which, char *buf, int size)*/
+int pull_state(const char *uuid, const char *which, char *buf, int size)
 {
 	int bitset_size;
 	struct log_c *lc;
 
 	if (!buf)
-		LOG_ERROR("load_bits: buf == NULL");
+		LOG_ERROR("pull_state: buf == NULL");
 
 	lc = get_log(uuid);
 	if (!lc) {
-		LOG_ERROR("load_bits: No log found for %s", uuid);
+		LOG_ERROR("pull_state: No log found for %s", uuid);
 		return -EINVAL;
 	}
 
+	if (!strncmp(which, "recovering_region", 17)) {
+		sscanf(buf, "%llu", (unsigned long long *)&lc->recovering_region);
+		LOG_DBG("[%s] recovering_region set to %llu",
+			SHORT_UUID(uuid),
+			(unsigned long long)lc->recovering_region);
+		return 0;
+	}
+
 	bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
 	if (bitset_size != size) {
-		LOG_ERROR("load_bits: bad bitset_size");
+		LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)",
+			  which, size, bitset_size);
 		return -EINVAL;
 	}
 
--- cluster/cmirror/src/Attic/functions.h	2008/02/06 23:03:05	1.1.2.4
+++ cluster/cmirror/src/Attic/functions.h	2008/02/08 14:30:10	1.1.2.5
@@ -8,10 +8,16 @@
 
 int local_resume(struct clog_tfr *tfr);
 int cluster_postsuspend(char *);
-int do_request(struct clog_tfr *tfr);
-int commit_log(struct clog_tfr *tfr);
+
+int do_request(struct clog_tfr *tfr, int server);
+
+/*
 int store_bits(const char *uuid, const char *which, char **buf);
 int load_bits(const char *uuid, const char *which, char *buf, int size);
+*/
+int push_state(const char *uuid, const char *which, char **buf);
+int pull_state(const char *uuid, const char *which, char *buf, int size);
+
 int log_get_state(struct clog_tfr *tfr);
 int log_status(int);
 #endif /* __CLOG_FUNCTIONS_DOT_H__ */
--- cluster/cmirror/src/Attic/local.c	2008/02/06 23:03:05	1.1.2.13
+++ cluster/cmirror/src/Attic/local.c	2008/02/08 14:30:10	1.1.2.14
@@ -166,7 +166,8 @@
 	case DM_CLOG_STATUS_INFO:
 	case DM_CLOG_STATUS_TABLE:
 	case DM_CLOG_PRESUSPEND:
-		r = do_request(tfr);
+		/* We do not specify ourselves as server here */
+		r = do_request(tfr, 0);
 		if (r)
 			LOG_DBG("Returning failed request to kernel [%s]",
 				RQ_TYPE(tfr->request_type));
@@ -177,7 +178,8 @@
 			
 		break;
 	case DM_CLOG_POSTSUSPEND:
-		r = do_request(tfr);
+		/* We do not specify ourselves as server here */
+		r = do_request(tfr, 0);
 		if (r) {
 			LOG_DBG("Returning failed request to kernel [%s]",
 				RQ_TYPE(tfr->request_type));
@@ -212,6 +214,7 @@
 			LOG_ERROR("[%s] Unable to send %s to cluster: %s",
 				  SHORT_UUID(tfr->uuid),
 				  RQ_TYPE(tfr->request_type), strerror(-r));
+			tfr->data_size = 0;
 			tfr->error = r;
 			kernel_send(tfr);
 		} else {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]