[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] cluster cmirror/src/Makefile cmirror/src/clogd ...



CVSROOT:	/cvs/cluster
Module name:	cluster
Branch: 	RHEL5
Changes by:	jbrassow sourceware org	2007-11-03 18:37:49

Modified files:
	cmirror/src    : Makefile clogd.c cluster.c common.h functions.c 
	                 local.c 
	cmirror-kernel/src: dm-clog-tfr.h dm-clog.c 

Log message:
	- Addition of disk logging
	- Add 'is_remote_recovering' function
	- Checkpoint clean-ups

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/Makefile.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.3.2.2&r2=1.3.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/clogd.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/common.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog-tfr.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.2.2.3&r2=1.2.2.4

--- cluster/cmirror/src/Attic/Makefile	2007/08/30 15:49:32	1.3.2.2
+++ cluster/cmirror/src/Attic/Makefile	2007/11/03 18:37:48	1.3.2.3
@@ -30,7 +30,7 @@
 endif
 
 ifeq ($(DEBUG),log)
-CFLAGS += -DDEBUG_LOG
+CFLAGS += -DDEBUG
 endif
 
 CFLAGS += -g
--- cluster/cmirror/src/Attic/clogd.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/clogd.c	2007/11/03 18:37:48	1.1.2.2
@@ -42,6 +42,7 @@
 
 	LOG_PRINT("Starting clogd:");
 	LOG_PRINT(" Built: "__DATE__" "__TIME__"\n");
+	LOG_DBG(" Compiled with debugging.");
 
 	while (!exit_now) {
 		links_monitor();
--- cluster/cmirror/src/Attic/cluster.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/cluster.c	2007/11/03 18:37:48	1.1.2.2
@@ -21,6 +21,7 @@
 
 #define DM_CLOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
 #define DM_CLOG_CHECKPOINT_READY ((uint32_t)-1)
+#define DM_CLOG_CHECKPOINT_REQUEST 0
 
 static uint32_t my_cluster_id = 0xDEAD;
 static SaCkptHandleT ckpt_handle;
@@ -102,11 +103,20 @@
 
 static int handle_cluster_request(struct clog_tfr *tfr, int server)
 {
-	int r;
+	int r = 0;
 
 	ENTER("%s", RQ_TYPE(tfr->request_type));
 
-	r = do_request(tfr);
+	/*
+	 * With resumes, we only handle our own.
+	 * Resume is a special case that requires
+	 * local action (to set up CPG), followed by
+	 * a cluster action to co-ordinate reading
+	 * the disk and checkpointing
+	 */
+	if ((tfr->request_type != DM_CLOG_RESUME) ||
+	    (tfr->originator == my_cluster_id))
+		r = do_request(tfr);
 
 	if (server) {
 		if (r)
@@ -176,6 +186,75 @@
 	return NULL;
 }
 
+/*
+ * prepare_checkpoint
+ * @entry: clog_cpg describing the log
+ * @cp_requester: nodeid requesting the checkpoint
+ *
+ * Creates and fills in a new checkpoint_data struct.
+ *
+ * Returns: checkpoint_data on success, NULL on error
+ */
+static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
+						  uint32_t cp_requester)
+{
+	struct checkpoint_data *new;
+
+	new = malloc(sizeof(*new));
+	if (!new) {
+		LOG_ERROR("Unable to create checkpoint data for %u",
+			  cp_requester);
+		return NULL;
+	}
+	memset(new, 0, sizeof(*new));
+	new->requester = cp_requester;
+	strncpy(new->uuid, entry->name.value, entry->name.length);
+
+	if (entry->valid) {
+		new->bitmap_size = store_bits(entry->name.value, "clean_bits",
+					      &new->clean_bits);
+		if (new->bitmap_size <= 0) {
+			LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
+				  new->requester);
+			free(new);
+			return NULL;
+		}
+
+		new->bitmap_size = store_bits(entry->name.value,
+					      "sync_bits", &new->sync_bits);
+		if (new->bitmap_size <= 0) {
+			LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
+				  new->requester);
+			free(new->clean_bits);
+			free(new);
+			return NULL;
+		}
+	} else {
+		/*
+		 * We can store bitmaps yet, because the log is not
+		 * valid yet.  The new machine will have to ask
+		 * specifically for a new checkpoint.
+		 */
+		LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
+			  new->requester);
+		new->bitmap_size = 0;
+	}
+
+	return new;
+}
+
+/*
+ * free_checkpoint
+ * @cp: the checkpoint_data struct to free
+ *
+ */
+static void free_checkpoint(struct checkpoint_data *cp)
+{
+	free(cp->sync_bits);
+	free(cp->clean_bits);
+	free(cp);
+}
+
 static int export_checkpoint(struct checkpoint_data *cp)
 {
 	SaCkptCheckpointCreationAttributesT attr;
@@ -307,6 +386,7 @@
 
 static int import_checkpoint(struct clog_cpg *entry)
 {
+	int rtn = 0;
 	SaCkptCheckpointHandleT h;
 	SaCkptSectionIterationHandleT itr;
 	SaCkptSectionDescriptorT desc;
@@ -385,8 +465,8 @@
 
 		if (rv != SA_AIS_OK) {
 			LOG_ERROR("import_checkpoint: clean checkpoint section creation failed");
-			EXIT();
-			return -EIO; /* FIXME: better error */
+			rtn = -EIO; /* FIXME: better error */
+			goto fail;
 		}
 
 		if (!desc.sectionSize) {
@@ -410,28 +490,38 @@
 
 		if (rv != SA_AIS_OK) {
 			LOG_ERROR("import_checkpoint: ckpt read error");
-			EXIT();
-			return -EIO; /* FIXME: better error */
+			rtn = -EIO; /* FIXME: better error */
+			goto fail;
 		}
 
+		/* FIXME: Is this catching something special?
 		if (!iov.readSize) {
 			LOG_ERROR("%s section empty", (char *)desc.sectionId.id);
 			continue;
 		}
+		*/
 
-		if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) {
-			LOG_ERROR("Error loading bits");
-			EXIT();
-			return -EIO;
+		if (iov.readSize) {
+			if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) {
+				LOG_ERROR("Error loading bits");
+				rtn = -EIO;
+				goto fail;
+			}
+		} else {
+			/* Need to request new checkpoint */
+			rtn = -EAGAIN;
+			goto fail;
 		}
 	}
+
+fail:
 	saCkptSectionIterationFinalize(itr);
 
 	saCkptCheckpointClose(h);
 
 	free(bitmap);
 	EXIT();
-	return 0;
+	return rtn;
 }
 
 static int do_cluster_work(void *data)
@@ -455,9 +545,7 @@
 			export_checkpoint(cp);
 
 			entry->checkpoint_list = cp->next;
-			free(cp->sync_bits);
-			free(cp->clean_bits);
-			free(cp);
+			free_checkpoint(cp);
 			cp = entry->checkpoint_list;
 		}
 	}
@@ -471,6 +559,7 @@
 	int r = 0;
 	struct clog_tfr *tfr = msg;
 	struct clog_tfr *startup_tfr = NULL;
+	struct clog_tfr *cp_tfr = NULL;
 	struct clog_cpg *match;
 
 	ENTER();
@@ -481,6 +570,13 @@
 	LOG_DBG("Message (len = %d) from node/pid %u/%d", msg_len,
 		  nodeid, pid);
 
+	if (tfr->request_type & DM_CLOG_RESPONSE)
+		LOG_DBG("Response from cluster recieved %s",
+			RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE));
+	else
+		LOG_DBG("Request from cluster recieved %s",
+			RQ_TYPE(tfr->request_type));
+
 	if (my_cluster_id == 0xDEAD) {
 		LOG_DBG("Message before init... ignoring.\n");
 		goto out;
@@ -494,10 +590,64 @@
 
 	if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
 		if ((!match->valid) && (my_cluster_id == tfr->originator)) {
-			if (import_checkpoint(match))
-				LOG_ERROR("Failed to import checkpoint");
-			else
+			switch (import_checkpoint(match)) {
+			case 0:
+				LOG_DBG("Checkpoint data recieved.  Log is now valid");
 				match->valid = 1;
+				break;
+			case -EAGAIN:
+				LOG_PRINT("Checkpoint data empty.  Requesting new checkpoint.");
+				
+				cp_tfr = queue_remove(free_queue);
+				if (!cp_tfr) {
+					/* FIXME: better error handling */
+					LOG_ERROR("No clog_tfr struct available");
+					goto out;
+				}
+				memset(cp_tfr, 0, sizeof(*cp_tfr));
+				cp_tfr->request_type = DM_CLOG_CHECKPOINT_REQUEST;
+
+				cp_tfr->originator = my_cluster_id;
+
+				strncpy(cp_tfr->uuid, tfr->uuid, CPG_MAX_NAME_LENGTH);
+
+				if ((r = cluster_send(cp_tfr))) {
+					/* FIXME: better error handling */
+					LOG_ERROR("Failed to send checkpoint ready notice");
+					queue_add(cp_tfr, free_queue);
+					goto out;
+				}
+				queue_add(cp_tfr, free_queue);
+
+				break;
+			default:
+				LOG_ERROR("Failed to import checkpoint");
+				/* Could we retry? */
+			}
+		}
+		goto out;
+	}
+
+	if (tfr->request_type == DM_CLOG_CHECKPOINT_REQUEST) {
+		if (tfr->originator == my_cluster_id) {
+			/*
+			 * The checkpoint includes any request up to the
+			 * request for checkpoint.  So, we must clear any
+			 * previous requests we were storing.
+			 */
+			while ((startup_tfr = queue_remove(match->startup_queue)))
+				queue_add(startup_tfr, free_queue);
+		} else if (my_cluster_id == match->lowest_id) {
+			struct checkpoint_data *new;
+
+			new = prepare_checkpoint(match, tfr->originator);
+			if (!new) {
+				/* FIXME: Need better error handling */
+				LOG_ERROR("Failed to prepare checkpoint!!!");
+				goto out;
+			}
+			new->next = match->checkpoint_list;
+			match->checkpoint_list = new;
 		}
 		goto out;
 	}
@@ -517,8 +667,8 @@
 
 		while ((startup_tfr = queue_remove(match->startup_queue))) {
 			LOG_DBG("Processing delayed request %d: %s",
-				  match->startup_queue->count,
-				  RQ_TYPE(startup_tfr->request_type));
+				match->startup_queue->count,
+				RQ_TYPE(startup_tfr->request_type));
 			r = handle_cluster_request(startup_tfr,
 						   (my_cluster_id == match->lowest_id) ? 1 : 0);
 			if (r) {
@@ -562,8 +712,12 @@
 
 	LOG_PRINT("* MEMBERS (%d):", member_list_entries);
 	for (i = 0; i < member_list_entries; i++)
+		/*
 		LOG_PRINT("*   [%d] nodeid: %d, pid: %d",
 			  i, member_list[i].nodeid, member_list[i].pid);
+		*/
+		syslog(LOG_NOTICE, "*   [%d] nodeid: %d, pid: %d",
+		       i, member_list[i].nodeid, member_list[i].pid);
 
 	LOG_PRINT("* LEAVING (%d):", left_list_entries);
 	for (i = 0; i < left_list_entries; i++)
@@ -641,33 +795,11 @@
 
 	if (do_checkpoint) {
 		struct checkpoint_data *new;
-		/* FIXME: might need to wait until cleared... */
+
 		for (i = 0; i < joined_list_entries; i++) {
-			new = malloc(sizeof(*new));
-			if (!new) {
-				LOG_ERROR("Unable to create checkpoint data for %u",
-					  joined_list[i].nodeid);
-				goto out;
-			}
-			memset(new, 0, sizeof(*new));
-			new->requester = joined_list[i].nodeid;
-			strncpy(new->uuid, match->name.value, match->name.length);
-			new->bitmap_size = store_bits(match->name.value, "clean_bits", &new->clean_bits);
-			if (new->bitmap_size <= 0) {
-				LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
-					  joined_list[i].nodeid);
-				free(new);
+			new = prepare_checkpoint(match, joined_list[i].nodeid);
+			if (!new)
 				goto out;
-			}
-
-			new->bitmap_size = store_bits(match->name.value,
-						      "sync_bits", &new->sync_bits);
-			if (new->bitmap_size <= 0) {
-				LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
-					  joined_list[i].nodeid);
-				free(new->clean_bits);
-				free(new);
-			}
 			new->next = match->checkpoint_list;
 			match->checkpoint_list = new;
 		}
--- cluster/cmirror/src/Attic/common.h	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/common.h	2007/11/03 18:37:48	1.1.2.2
@@ -16,6 +16,7 @@
 
 #define EXIT_QUEUE_NOMEM           7
 
+/* Located in dm-clog-tfr.h
 #define RQ_TYPE(x) \
 	((x) == DM_CLOG_CTR) ? "DM_CLOG_CTR" : \
 	((x) == DM_CLOG_DTR) ? "DM_CLOG_DTR" : \
@@ -34,5 +35,6 @@
 	((x) == DM_CLOG_STATUS_INFO) ? "DM_CLOG_STATUS_INFO" : \
 	((x) == DM_CLOG_STATUS_TABLE) ? "DM_CLOG_STATUS_TABLE" : \
 	NULL
+*/
 
 #endif /* __CLUSTER_LOG_COMMON_DOT_H__ */
--- cluster/cmirror/src/Attic/functions.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/functions.c	2007/11/03 18:37:48	1.1.2.2
@@ -1,8 +1,15 @@
 #include <stdint.h>
 #include <errno.h>
 #include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <unistd.h>
 #include <ext2fs/ext2_fs.h>
 #include <ext2fs/ext2fs.h>
+#include <linux/kdev_t.h>
+#define __USE_GNU /* for O_DIRECT */
+#include <fcntl.h>
 #include "functions.h"
 #include "queues.h"
 #include "common.h"
@@ -11,6 +18,21 @@
 
 #define BYTE_SHIFT 3
 
+/*
+ * Magic for persistent mirrors: "MiRr"
+ * Following on-disk header information is stolen from
+ * drivers/md/dm-log.c
+ */
+#define MIRROR_MAGIC 0x4D695272
+#define MIRROR_DISK_VERSION 2
+#define LOG_OFFSET 2
+
+struct log_header {
+        uint32_t magic;
+        uint32_t version;
+        uint64_t nr_regions;
+};
+
 struct log_c {
 	struct list_head list;
 	char uuid[DM_UUID_LEN];
@@ -26,6 +48,8 @@
 	uint64_t recovering_region; /* -1 means not recovering */
 	int sync_search;
 
+	int resume_override;
+
         enum sync {
                 DEFAULTSYNC,    /* Synchronize if necessary */
                 NOSYNC,         /* Devices known to be already in sync */
@@ -33,9 +57,14 @@
         } sync;
 
 	int disk_fd;            /* -1 means no disk log */
+	int log_dev_failed;
+	uint64_t disk_nr_regions;
+	size_t disk_size;       /* size of disk_buffer in bytes */
+	void *disk_buffer;      /* aligned memory for O_DIRECT */
 };
 
 static struct list_head log_list = LIST_HEAD_INIT(log_list);
+static struct list_head log_pending_list = LIST_HEAD_INIT(log_pending_list);
 
 static int log_test_bit(uint32_t *bs, unsigned bit)
 {
@@ -61,6 +90,21 @@
 	return start;
 }
 
+static uint64_t count_bits32(uint32_t *addr, uint32_t count)
+{
+	int j;
+	uint32_t i;
+	uint64_t rtn = 0;
+
+	for (i = 0; i < count; i++) {
+		if (!addr[i])
+			continue;
+		for (j = 0; j < 32; j++)
+			rtn += (addr[i] & (1<<j)) ? 1 : 0;
+	}
+	return rtn;
+}
+
 /*
  * get_log
  * @tfr
@@ -82,9 +126,191 @@
 	return NULL;
 }
 
+/*
+ * get_pending_log
+ * @tfr
+ *
+ * Pending logs are logs that have been 'clog_ctr'ed, but
+ * have not joined the CPG (via clog_resume).
+ *
+ * Returns: log if found, NULL otherwise
+ */
+static struct log_c *get_pending_log(const char *uuid)
+{
+	struct list_head *l;
+	struct log_c *lc;
+
+	/* FIXME: Need prefetch to do this right */
+	__list_for_each(l, &log_pending_list) {
+		lc = list_entry(l, struct log_c, list);
+		if (!strcmp(lc->uuid, uuid))
+			return lc;
+	}
+
+	return NULL;
+}
+
+static void header_to_disk(struct log_header *mem, struct log_header *disk)
+{
+	memcpy(disk, mem, sizeof(struct log_header));
+}
+
+static void header_from_disk(struct log_header *mem, struct log_header *disk)
+{
+	memcpy(mem, disk, sizeof(struct log_header));
+}
+
+static int rw_log(struct log_c *lc, int do_write)
+{
+	int r;
+
+	r = lseek(lc->disk_fd, 0, SEEK_SET);
+	if (r < 0) {
+		LOG_ERROR("rw_log:  lseek failure: %s",
+			  strerror(errno));
+		return -errno;
+	}
+
+	if (do_write) {
+		r = write(lc->disk_fd, lc->disk_buffer, lc->disk_size);
+		if (r < 0) {
+			LOG_ERROR("rw_log:  write failure: %s",
+				  strerror(errno));
+			return -EIO;
+		}
+		return 0;
+	}
+
+	/* Read */
+	r = read(lc->disk_fd, lc->disk_buffer, lc->disk_size);
+	if (r < 0)
+		LOG_ERROR("rw_log:  read failure: %s",
+			  strerror(errno));
+	if (r != lc->disk_size)
+		return -EIO;
+	return 0;
+}
+
+/*
+ * read_log
+ * @lc
+ *
+ * Valid return codes:
+ *   -EINVAL:  Invalid header, bits not copied
+ *   -EIO:     Unable to read disk log
+ *    0:       Valid header, disk bit -> lc->sync_bits
+ *
+ * Returns: 0 on success, -EXXX on failure
+ */
+static int read_log(struct log_c *lc)
+{
+	struct log_header lh;
+	size_t bitset_size;
+
+	memset(&lh, 0, sizeof(struct log_header));
+
+	if (rw_log(lc, 0))
+		return -EIO;
+
+	header_from_disk(&lh, lc->disk_buffer);
+	if (lh.magic != MIRROR_MAGIC) {
+		LOG_ERROR("Header not valid");
+		LOG_ERROR("  magic     : %x  (expected: %x)",
+			  lh.magic, MIRROR_MAGIC);
+		LOG_ERROR("  version   : %u", lh.version);
+		LOG_ERROR("  nr_regions: %llu", lh.nr_regions);
+		LOG_ERROR("*** %s ***", strerror(EINVAL));
+		return -EINVAL;
+	}
+
+	lc->disk_nr_regions = lh.nr_regions;
+
+	/* Read disk bits into sync_bits */
+	bitset_size = lc->region_count / 8;
+	bitset_size += (lc->region_count % 8) ? 1 : 0;
+	memcpy(lc->clean_bits, lc->disk_buffer + 1024, bitset_size);
+
+	return 0;
+}
+
+/*
+ * write_log
+ * @lc
+ *
+ * Returns: 0 on success, -EIO on failure
+ */
+static int write_log(struct log_c *lc)
+{
+	struct log_header lh;
+	size_t bitset_size;
+
+	lh.magic = MIRROR_MAGIC;
+	lh.version = MIRROR_DISK_VERSION;
+	lh.nr_regions = lc->region_count;
+
+	header_to_disk(&lh, lc->disk_buffer);
+
+	/* Write disk bits from clean_bits */
+	bitset_size = lc->region_count / 8;
+	bitset_size += (lc->region_count % 8) ? 1 : 0;
+	memcpy(lc->disk_buffer + 1024, lc->sync_bits, bitset_size);
+
+	if (rw_log(lc, 1))
+		return -EIO;
+	return 0;
+}
+
+static int find_disk_path(char *major_minor_str, char *path_rtn)
+{
+	int r;
+	DIR *dp;
+	struct dirent *dep;
+	struct stat statbuf;
+	int major, minor;
+
+	r = sscanf(major_minor_str, "%d:%d", &major, &minor);
+	if (r != 2)
+		return -EINVAL;
+
+	LOG_DBG("Checking /dev/mapper for device %d:%d", major, minor);
+	/* Check /dev/mapper dir */
+	dp = opendir("/dev/mapper");
+	if (!dp)
+		return -ENOENT;
+
+	while ((dep = readdir(dp)) != NULL) {
+		/*
+		 * FIXME: This is racy.  By the time the path is used,
+		 * it may point to something else.  'fstat' will be
+		 * required upon opening to ensure we got what we
+		 * wanted.
+		 */
+
+		sprintf(path_rtn, "/dev/mapper/%s", dep->d_name);
+		stat(path_rtn, &statbuf);
+		if (S_ISBLK(statbuf.st_mode) &&
+		    (major(statbuf.st_rdev) == major) &&
+		    (minor(statbuf.st_rdev) == minor)) {
+			LOG_DBG("  %s: YES", dep->d_name);
+			closedir(dp);
+			return 0;
+		} else {
+			LOG_DBG("  %s: NO", dep->d_name);
+		}
+	}
+
+	closedir(dp);
+
+	LOG_DBG("Path not found for %d/%d", major, minor);
+	LOG_DBG("Creating /dev/mapper/%d-%d", major, minor);
+	sprintf(path_rtn, "/dev/mapper/%d-%d", major, minor);
+	r = mknod(path_rtn, S_IFBLK | S_IRUSR | S_IWUSR, MKDEV(major, minor));
+
+	return r ? -errno : 0;
+}
+
 static int _clog_ctr(int argc, char **argv, uint64_t device_size)
 {
-	int disk_log = 0;
 	int r = 0;
 	char *p;
 	uint64_t region_size;
@@ -92,6 +318,12 @@
 	uint32_t bitset_size;
 	struct log_c *lc = NULL;
 	enum sync sync = DEFAULTSYNC;
+
+	int disk_log = 0;
+	char disk_path[128];
+	size_t page_size;
+	int pages;
+
 	ENTER();
 
 	/* If core log request, then argv[0] will be region_size */
@@ -104,6 +336,13 @@
 			r = -EINVAL;
 			goto fail;
 		}
+
+		r = find_disk_path(argv[0], disk_path);
+		if (r) {
+			LOG_ERROR("Unable to find path to device %s", argv[0]);
+			goto fail;
+		}
+		LOG_DBG("Clustered log disk is %s", disk_path);
 	} else {
 		disk_log = 0;
 
@@ -147,7 +386,11 @@
 	lc->region_size = region_size;
 	lc->region_count = region_count;
 	lc->sync = sync;
+	lc->sync_search = 0;
+	lc->recovering_region = (uint64_t)-1;
 	strncpy(lc->uuid, argv[1 + disk_log], DM_UUID_LEN);
+	lc->disk_fd = -1;
+	lc->log_dev_failed = 0;
 
 	lc->bitset_uint32_count = region_count / (sizeof(*lc->clean_bits) << BYTE_SHIFT);
 	if (region_count % (sizeof(*lc->clean_bits) << BYTE_SHIFT))
@@ -155,10 +398,9 @@
 
 	bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
 
-	lc->clean_bits = malloc(bitset_size);
+	lc->clean_bits = malloc(bitset_size);	
 	if (!lc->clean_bits) {
 		LOG_ERROR("Unable to allocate clean bitset");
-		free(lc);
 		r = -ENOMEM;
 		goto fail;
 	}
@@ -167,36 +409,52 @@
 	lc->sync_bits = malloc(bitset_size);
 	if (!lc->sync_bits) {
 		LOG_ERROR("Unable to allocate sync bitset");
-		free(lc->clean_bits);
-		free(lc);
 		r = -ENOMEM;
 		goto fail;
 	}
 	memset(lc->sync_bits, (sync == NOSYNC) ? -1 : 0, bitset_size);
 	lc->sync_count = (sync == NOSYNC) ? region_count : 0;
-
-	lc->sync_search = 0;
-	lc->disk_fd = -1;
-	lc->recovering_region = (uint64_t)-1;
-
 	if (disk_log) {
-		/*
-		 * FIXME:
-		 *	- open device
-		 *	- allocate direct I/O space
-		 *	- test read before resume?
-		 */
-		LOG_ERROR("clustered_disk log not implemented");
-		free(lc->sync_bits);
-		free(lc->clean_bits);
-		free(lc);
-		r = -ENOSYS;
-		goto fail;
+		page_size = sysconf(_SC_PAGESIZE);
+		pages = bitset_size/page_size;
+		pages += bitset_size%page_size ? 1 : 0;
+		pages += 1; /* for header */
+
+		r = open(disk_path, O_RDWR | O_DIRECT);
+		if (r < 0) {
+			LOG_ERROR("Unable to open log device, %s: %s",
+				  disk_path, strerror(errno));
+			r = errno;
+			goto fail;
+		}
+		lc->disk_fd = r;
+		lc->disk_size = pages * page_size;
+
+		r = posix_memalign(&(lc->disk_buffer), page_size,
+			lc->disk_size);
+		if (r) {
+			LOG_ERROR("Unable to allocate memory for disk_buffer");
+			goto fail;
+		}
+		LOG_DBG("Disk log ready");
 	}
 
-	list_add(&lc->list, &log_list);
+	list_add(&lc->list, &log_pending_list);
 
+	EXIT();
+	return 0;
 fail:
+	if (lc) {
+		if (lc->clean_bits)
+			free(lc->clean_bits);
+		if (lc->sync_bits)
+			free(lc->sync_bits);
+		if (lc->disk_buffer)
+			free(lc->disk_buffer);
+		if (lc->disk_fd >= 0)
+			close(lc->disk_fd);
+		free(lc);
+	}
 	EXIT();
 	return r;
 }
@@ -227,6 +485,9 @@
 
 	if (strlen(tfr->data) != tfr->data_size) {
 		LOG_ERROR("Received constructor request with bad data");
+		LOG_DBG("strlen(tfr->data)[%d] != tfr->data_size[%d]",
+			strlen(tfr->data), tfr->data_size);
+		LOG_DBG("tfr->data = %s", tfr->data);
 		return -EINVAL;
 	}
 
@@ -249,12 +510,8 @@
 
 	argc--;  /* We pass in the device_size separate */
 	r = _clog_ctr(argc, argv, device_size);
-	if (!r) {
-		r = create_cluster_cpg(tfr->uuid);
 
-		if (r)
-			clog_dtr(tfr);
-	}
+	/* We join the CPG when we resume */
 
 	/* No returning data */
 	tfr->data_size = 0;
@@ -321,27 +578,151 @@
 	if (!lc)
 		return -EINVAL;
 
+	lc->resume_override = 0;
 	return 0;
 }
 
 /*
- * clog_resume
+ * _clog_resume
  * @tfr
  *
+ * Does the main work of resuming.
  */
-static int clog_resume(struct clog_tfr *tfr)
+static int _clog_resume(struct clog_tfr *tfr)
 {
+	uint32_t i;
 	struct log_c *lc = get_log(tfr->uuid);
+	size_t size = lc->bitset_uint32_count * sizeof(uint32_t);
 
 	if (!lc)
 		return -EINVAL;
 
-	if (lc->disk_fd != -1)
-		tfr->error = -ENOSYS;
+	if (lc->disk_fd == -1)
+		return 0;
+
+	switch (lc->resume_override) {
+	case 1000:
+		LOG_ERROR("ERROR:: Additional resume issued before suspend");
+		return 0;
+	case 0:
+		LOG_PRINT("Master resume: reading disk log");
+		lc->resume_override = 1000;
+		break;
+	case 1:
+		LOG_ERROR("Error:: partial bit loading (just sync_bits)");
+		return -EINVAL;
+	case 2:
+		LOG_ERROR("Error:: partial bit loading (just clean_bits)");
+		return -EINVAL;
+	case 3:
+		LOG_DBG("Non-master resume: bits pre-loaded");
+		lc->resume_override = 1000;
+		return 0;
+	default:
+		LOG_ERROR("Error:: multiple loading of bits (%d)", lc->resume_override);
+		return -EINVAL;
+	}
+
+	if (lc->log_dev_failed) {
+		LOG_ERROR("Log device has failed, unable to read bits");
+		tfr->error = 0;  /* We can handle this so far */
+		lc->disk_nr_regions = 0;
+	} else
+		tfr->error = read_log(lc);
+
+	switch (tfr->error) {
+	case 0:
+		if (lc->disk_nr_regions < lc->region_count)
+			LOG_PRINT("Mirror has grown, updating log bits");
+		else if (lc->disk_nr_regions > lc->region_count)
+			LOG_PRINT("Mirror has shrunk, updating log bits");
+		break;		
+	case -EINVAL:
+		LOG_DBG("Read log failed: not yet initialized");
+		lc->disk_nr_regions = 0;
+		break;
+	default:
+		LOG_ERROR("Failed to read disk log");
+		lc->disk_nr_regions = 0;
+		break;
+	}
+
+	/* If mirror has grown, set bits appropriately */
+	if (lc->sync == NOSYNC)
+		for (i = lc->disk_nr_regions; i < lc->region_count; i++)
+			log_set_bit(lc, lc->clean_bits, i);
+	else
+		for (i = lc->disk_nr_regions; i < lc->region_count; i++)
+			log_clear_bit(lc, lc->clean_bits, i);
+
+	/* Clear any old bits if device has shrunk */
+	for (i = lc->region_count; i % 32; i++)
+		log_clear_bit(lc, lc->clean_bits, i);
+
+	/* copy clean across to sync */
+	memcpy(lc->sync_bits, lc->clean_bits, size);
+	lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+	lc->sync_search = 0;
+
+	/*
+	tfr->error = write_log(lc);
+	if (tfr->error) {
+		lc->log_dev_failed = 1;
+		LOG_ERROR("Failed to write initial disk log");
+	} else
+		lc->log_dev_failed = 0;
+	*/
+	/*
+	 * We mark 'touched' so that only the master commits
+	 * the log via 'commit_log'
+	 */
+	lc->touched = 1;
+
 	return tfr->error;
 }
 
 /*
+ * clog_resume
+ * @tfr
+ *
+ * If the log is pending, we must first join the cpg and
+ * put the log in the official list.
+ *
+ * If the log is in the official list, then we call
+ * _clog_resume.
+ */
+static int clog_resume(struct clog_tfr *tfr)
+{
+	int r;
+	struct log_c *lc = get_log(tfr->uuid);
+
+	if (!lc) {
+		/* Is the log in the pending list? */
+		lc = get_pending_log(tfr->uuid);
+		if (!lc) {
+			LOG_ERROR("clog_resume called on log that is not official or pending");
+			return -EINVAL;
+		}
+
+		/* Join the CPG */
+		r = create_cluster_cpg(tfr->uuid);
+		if (r) {
+			LOG_ERROR("clog_resume:  Failed to create cluster CPG");
+			return r;
+		}
+
+		/* move log to official list */
+		list_del_init(&lc->list);
+		list_add(&lc->list, &log_list);
+
+		return 0;
+	}
+
+	/* log is in the official list, try to resume */
+	return _clog_resume(tfr);
+}
+
+/*
  * clog_get_region_size
  * @tfr
  *
@@ -619,8 +1000,19 @@
 
 static int disk_status_info(struct log_c *lc, struct clog_tfr *tfr)
 {
-	tfr->error = -ENOSYS;
-	return -ENOSYS;
+	char *data = (char *)tfr->data;
+	struct stat statbuf;
+
+	if(fstat(lc->disk_fd, &statbuf)) {
+		tfr->error = -errno;
+		return -errno;
+	}
+
+	tfr->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
+				 major(statbuf.st_rdev), minor(statbuf.st_rdev),
+				 'A'); /* FIXME: detect dead device */
+
+	return 0;
 }
 
 /*
@@ -649,9 +1041,9 @@
 	int params;
 	char *data = (char *)tfr->data;
 
-	params = (lc->sync == DEFAULTSYNC) ? 2 : 3;
-	tfr->data_size = sprintf(data, "clustered_core %d %u %sblock_on_error ",
-				 params, lc->region_size,
+	params = (lc->sync == DEFAULTSYNC) ? 3 : 4;
+	tfr->data_size = sprintf(data, "clustered_core %d %u %s %sblock_on_error ",
+				 params, lc->region_size, lc->uuid,
 				 (lc->sync == DEFAULTSYNC) ? "" :
 				 (lc->sync == NOSYNC) ? "nosync " : "sync ");
 	return 0;
@@ -659,8 +1051,23 @@
 
 static int disk_status_table(struct log_c *lc, struct clog_tfr *tfr)
 {
-	tfr->error = -ENOSYS;
-	return -ENOSYS;
+	int params;
+	char *data = (char *)tfr->data;
+	struct stat statbuf;
+
+	if(fstat(lc->disk_fd, &statbuf)) {
+		tfr->error = -errno;
+		return -errno;
+	}
+
+	params = (lc->sync == DEFAULTSYNC) ? 4 : 5;
+	tfr->data_size = sprintf(data, "clustered_disk %d %u %d:%d %s %sblock_on_error ",
+				 params, lc->region_size,
+				 major(statbuf.st_rdev), minor(statbuf.st_rdev),
+				 lc->uuid,
+				 (lc->sync == DEFAULTSYNC) ? "" :
+				 (lc->sync == NOSYNC) ? "nosync " : "sync ");
+	return 0;
 }
 
 /*
@@ -685,6 +1092,30 @@
 }
 
 /*
+ * clog_is_remote_recovering
+ * @tfr
+ *
+ */
+static int clog_is_remote_recovering(struct clog_tfr *tfr)
+{
+	int *rtn = (int *)tfr->data;
+	uint64_t region = *((uint64_t *)(tfr->data));
+	struct log_c *lc = get_log(tfr->uuid);
+
+	if (!lc)
+		return -EINVAL;
+
+	if (region > lc->region_count)
+		return -EINVAL;
+
+	*rtn = !log_test_bit(lc->sync_bits, region);
+	tfr->data_size = sizeof(*rtn);
+
+	return 0;	
+}
+
+
+/*
  * do_request
  * @tfr: the request
  *
@@ -757,7 +1188,12 @@
 	case DM_CLOG_STATUS_TABLE:
 		r = clog_status_table(tfr);
 		break;
+	case DM_CLOG_IS_REMOTE_RECOVERING:
+		r = clog_is_remote_recovering(tfr);
+		break;
 	default:
+		LOG_ERROR("Unknown request");
+		r = tfr->error = -EINVAL;
 		break;
 	}
 
@@ -805,9 +1241,12 @@
 		goto out;
 
 	if (lc->disk_fd >= 0) {
-		/* FIXME: implement */
-		tfr->error = -ENOSYS;
-		r = -ENOSYS;
+		r = tfr->error = write_log(lc);
+		if (r) {
+			LOG_ERROR("Error writing to disk log");
+			return -EIO;
+		}
+		LOG_DBG("Disk log written");
 	}
 
 	lc->touched = 0;
@@ -894,10 +1333,12 @@
 	}
 
 	if (!strncmp(which, "sync_bits", 9)) {
+		lc->resume_override += 1;
 		memcpy(lc->sync_bits, buf, bitset_size);
 		LOG_DBG("loading sync_bits:");
 		print_bits((char *)lc->sync_bits, bitset_size);
 	} else if (!strncmp(which, "clean_bits", 9)) {
+		lc->resume_override += 2;
 		memcpy(lc->clean_bits, buf, bitset_size);
 		LOG_DBG("loading clean_bits:");
 		print_bits((char *)lc->clean_bits, bitset_size);
@@ -913,6 +1354,7 @@
 	struct log_c *lc;
 
 	/* FIXME: Need prefetch to do this right */
+	LOG_DBG("Official log list:");
 	__list_for_each(l, &log_list) {
 		found = 1;
 		lc = list_entry(l, struct log_c, list);
@@ -924,5 +1366,17 @@
 		print_bits((char *)lc->clean_bits,
 			   lc->bitset_uint32_count * sizeof(*lc->clean_bits));
 	}
+	LOG_DBG("Pending log list:");
+	__list_for_each(l, &log_pending_list) {
+		found = 1;
+		lc = list_entry(l, struct log_c, list);
+		LOG_DBG("%s", lc->uuid);
+		LOG_DBG("sync_bits:");
+		print_bits((char *)lc->sync_bits,
+			   lc->bitset_uint32_count * sizeof(*lc->sync_bits));
+		LOG_DBG("clean_bits:");
+		print_bits((char *)lc->clean_bits,
+			   lc->bitset_uint32_count * sizeof(*lc->clean_bits));
+	}
 	return found;
 }
--- cluster/cmirror/src/Attic/local.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/local.c	2007/11/03 18:37:48	1.1.2.2
@@ -164,6 +164,23 @@
 				  RQ_TYPE(tfr->request_type));
 			
 		break;
+	case DM_CLOG_RESUME:
+		/*
+		 * Resume is a special case that requires a local
+		 * component to join the CPG, and a cluster component
+		 * to handle the request.
+		 */
+		r = do_request(tfr);
+		if (r) {
+			LOG_DBG("Returning failed request to kernel [%s]",
+				RQ_TYPE(tfr->request_type));
+			r = kernel_send(tfr);
+			if (r)
+				LOG_ERROR("Failed to respond to kernel [%s]",
+					  RQ_TYPE(tfr->request_type));
+			break;
+		}
+		/* ELSE, fall through to default */
 	default:
 		/* Add before send_to_cluster, so cluster code can find it */
 		queue_add_tail(tfr, cluster_queue);
--- cluster/cmirror-kernel/src/dm-clog-tfr.h	2007/08/23 19:54:57	1.1.2.1
+++ cluster/cmirror-kernel/src/dm-clog-tfr.h	2007/11/03 18:37:48	1.1.2.2
@@ -27,6 +27,7 @@
 #define DM_CLOG_GET_SYNC_COUNT        14
 #define DM_CLOG_STATUS_INFO           15
 #define DM_CLOG_STATUS_TABLE          16
+#define DM_CLOG_IS_REMOTE_RECOVERING  17
 
 #define RQ_TYPE(x) \
 	((x) == DM_CLOG_CTR) ? "DM_CLOG_CTR" : \
@@ -45,7 +46,8 @@
 	((x) == DM_CLOG_GET_SYNC_COUNT) ? "DM_CLOG_GET_SYNC_COUNT" : \
 	((x) == DM_CLOG_STATUS_INFO) ? "DM_CLOG_STATUS_INFO" : \
 	((x) == DM_CLOG_STATUS_TABLE) ? "DM_CLOG_STATUS_TABLE" : \
-	NULL
+	((x) == DM_CLOG_IS_REMOTE_RECOVERING) ? \
+	"DM_CLOG_IS_REMOTE_RECOVERING" : NULL
 
 struct clog_tfr {
 	uint64_t private[2];
--- cluster/cmirror-kernel/src/dm-clog.c	2007/08/30 18:26:22	1.2.2.3
+++ cluster/cmirror-kernel/src/dm-clog.c	2007/11/03 18:37:48	1.2.2.4
@@ -560,6 +560,20 @@
 	return DMLOG_IOERR_BLOCK;
 }
 
+static int cluster_is_remote_recovering(struct dirty_log *log, region_t region)
+{
+	int r;
+	int is_recovering;
+	int rdata_size;
+	struct log_c *lc = (struct log_c *)log->context;
+
+	rdata_size = sizeof(is_recovering);
+	r = dm_clog_consult_server(lc->uuid, DM_CLOG_IS_REMOTE_RECOVERING,
+				   (char *)&region, sizeof(region),
+				   (char *)&is_recovering, &rdata_size);
+	return (r) ? 1 : is_recovering;
+}
+
 static struct dirty_log_type _clustered_core_type = {
 	.name = "clustered_core",
 	.module = THIS_MODULE,
@@ -579,6 +593,7 @@
 	.get_sync_count = cluster_get_sync_count,
 	.status = cluster_status,
 	.get_failure_response = cluster_get_failure_response,
+	.is_remote_recovering = cluster_is_remote_recovering,
 };
 
 static struct dirty_log_type _clustered_disk_type = {
@@ -600,6 +615,7 @@
 	.get_sync_count = cluster_get_sync_count,
 	.status = cluster_status,
 	.get_failure_response = cluster_get_failure_response,
+	.is_remote_recovering = cluster_is_remote_recovering,
 };
 
 static int __init cluster_dirty_log_init(void)
@@ -638,7 +654,7 @@
 		return r;
 	}
 
-	DMINFO("dm-clulog (built %s %s) installed", __DATE__, __TIME__);
+	DMINFO("dm-log-clustered (built %s %s) installed", __DATE__, __TIME__);
 	return 0;
 }
 
@@ -648,7 +664,7 @@
 	dm_unregister_dirty_log_type(&_clustered_core_type);
 	dm_clog_tfr_exit();
 	mempool_destroy(flush_entry_pool);
-	DMINFO("dm-clulog (built %s %s) removed", __DATE__, __TIME__);
+	DMINFO("dm-log-clustered (built %s %s) removed", __DATE__, __TIME__);
 	return;
 }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]