[Cluster-devel] cluster/group/gfs_controld Makefile cpg.c lock ...

teigland at sourceware.org teigland at sourceware.org
Mon Jul 31 18:37:08 UTC 2006


CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	teigland at sourceware.org	2006-07-31 18:37:07

Modified files:
	group/gfs_controld: Makefile cpg.c lock_dlm.h main.c plock.c 
	                    recover.c 

Log message:
	- use nodeid and owner when checking the owner of a plock instead of
	just pid
	- this requires the recent addition of an owner field to the struct
	in the lock_dlm_plock.h kernel header
	- add ability to dump all the plocks to a client (group_tool) to display
	- add new code that uses the SA CKPT service to synchronize all the
	plock state for the group to a new node that joins the group, this is
	currently disabled until it's been tested and debugged

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/Makefile.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/cpg.c.diff?cvsroot=cluster&r1=1.4&r2=1.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/lock_dlm.h.diff?cvsroot=cluster&r1=1.7&r2=1.8
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/main.c.diff?cvsroot=cluster&r1=1.6&r2=1.7
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/plock.c.diff?cvsroot=cluster&r1=1.2&r2=1.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/recover.c.diff?cvsroot=cluster&r1=1.4&r2=1.5

--- cluster/group/gfs_controld/Makefile	2006/07/07 16:53:26	1.5
+++ cluster/group/gfs_controld/Makefile	2006/07/31 18:37:07	1.6
@@ -38,7 +38,7 @@
 		plock.o \
 		recover.o \
 		../lib/libgroup.a
-	$(CC) $(LDFLAGS) -o $@ $^ -lcman -lcpg 
+	$(CC) $(LDFLAGS) -o $@ $^ -lcman -lcpg -lSaCkpt
 
 
 main.o: main.c
--- cluster/group/gfs_controld/cpg.c	2006/06/30 15:35:23	1.4
+++ cluster/group/gfs_controld/cpg.c	2006/07/31 18:37:07	1.5
@@ -38,8 +38,11 @@
 	hd = (struct gdlm_header *) data;
 
 	mg = find_mg(hd->name);
-	if (!mg)
+	if (!mg) {
+		log_error("cpg message from %d len %d no group %s",
+			  nodeid, len, hd->name);
 		return;
+	}
 
 	hd->version[0]	= le16_to_cpu(hd->version[0]);
 	hd->version[1]	= le16_to_cpu(hd->version[1]);
@@ -152,8 +155,10 @@
 	}
 
 	cpg_fd_get(daemon_handle, &fd);
-	if (fd < 0)
+	if (fd < 0) {
+		log_error("cpg_fd_get error %d", error);
 		return -1;
+	}
 
 	memset(&daemon_name, 0, sizeof(daemon_name));
 	strcpy(daemon_name.value, "gfs_controld");
@@ -187,15 +192,17 @@
 
  retry:
 	error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
-	if (error != CPG_OK)
-		log_error("cpg_mcast_joined error %d handle %llx", error, h);
 	if (error == CPG_ERR_TRY_AGAIN) {
-		/* FIXME: backoff say .25 sec, .5 sec, .75 sec, 1 sec */
+		log_debug("cpg_mcast_joined error %d", error);
 		retries++;
 		if (retries > 3)
 			sleep(1);
 		goto retry;
 	}
+	if (error != CPG_OK) {
+		log_error("cpg_mcast_joined error %d handle %llx", error, h);
+		return -1;
+	}
 
 	return 0;
 }
--- cluster/group/gfs_controld/lock_dlm.h	2006/07/20 20:19:44	1.7
+++ cluster/group/gfs_controld/lock_dlm.h	2006/07/31 18:37:07	1.8
@@ -141,6 +141,10 @@
 	int			wait_first_done;
 	int			low_finished_nodeid;
 
+	uint64_t		cp_handle;
+	time_t			last_checkpoint_time;
+	time_t			last_plock_time;
+
 	int			needs_recovery;
 	int			our_jid;
 	int			spectator;
@@ -246,4 +250,8 @@
 
 int send_group_message(struct mountgroup *mg, int len, char *buf);
 
+void store_plocks(struct mountgroup *mg);
+void retrieve_plocks(struct mountgroup *mg);
+int dump_plocks(char *name, int fd);
+
 #endif
--- cluster/group/gfs_controld/main.c	2006/07/20 20:19:44	1.6
+++ cluster/group/gfs_controld/main.c	2006/07/31 18:37:07	1.7
@@ -125,7 +125,7 @@
 	return write(client[ci].fd, buf, len);
 }
 
-static int do_dump(int ci)
+static int dump_debug(int ci)
 {
 	int rv, len;
 
@@ -182,7 +182,11 @@
 	else if (!strcmp(cmd, "remount"))
 		rv = do_remount(ci, dir, argv[3]);
 	else if (!strcmp(cmd, "dump")) {
-		do_dump(ci);
+		dump_debug(ci);
+		return 0;
+	} else if (!strcmp(cmd, "plocks")) {
+		dump_plocks(dir, client[ci].fd);
+		client_dead(ci);
 		return 0;
 	} else
 		rv = -EINVAL;
--- cluster/group/gfs_controld/plock.c	2006/07/19 14:44:40	1.2
+++ cluster/group/gfs_controld/plock.c	2006/07/31 18:37:07	1.3
@@ -31,6 +31,8 @@
 #include <netdb.h>
 #include <limits.h>
 #include <unistd.h>
+#include <openais/saAis.h>
+#include <openais/saCkpt.h>
 #include <linux/lock_dlm_plock.h>
 
 #include "lock_dlm.h"
@@ -43,6 +45,25 @@
 
 static int control_fd = -1;
 extern int our_nodeid;
+static int plocks_online = 0;
+
+static SaCkptHandleT ckpt_handle;
+static SaCkptCallbacksT callbacks = { 0, 0 };
+static SaVersionT version = { 'B', 1, 1 };
+static char section_buf[1024 * 1024];
+static uint32_t section_len;
+
+struct pack_plock {
+	uint64_t start;
+	uint64_t end;
+	uint64_t owner;
+	uint32_t pid;
+	uint32_t nodeid;
+	uint8_t ex;
+	uint8_t waiter;
+	uint16_t pad1;
+	uint32_t pad;
+};
 
 struct resource {
 	struct list_head	list;	   /* list of resources */
@@ -54,9 +75,11 @@
 struct posix_lock {
 	struct list_head	list;	   /* resource locks or waiters list */
 	uint32_t		pid;
+	uint64_t		owner;
 	uint64_t		start;
 	uint64_t		end;
 	int			ex;
+	int			nodeid;
 };
 
 struct lock_waiter {
@@ -194,8 +217,18 @@
 
 int setup_plocks(void)
 {
+	SaAisErrorT err;
 	int rv;
 
+	err = saCkptInitialize(&ckpt_handle, &callbacks, &version);
+	if (err == SA_AIS_OK)
+		plocks_online = 1;
+	else
+		log_error("ckpt init error %d - plocks unavailable", err);
+
+	/* REMOVEME: disable actual use of checkpoints for now */
+	plocks_online = 0;
+
 	rv = open_control();
 	if (rv)
 		return rv;
@@ -222,6 +255,7 @@
 
 	mg = find_mg_id(info.fsid);
 	if (!mg) {
+		log_debug("process_plocks: no mg id %x", info.fsid);
 		rv = -EEXIST;
 		goto fail;
 	}
@@ -232,13 +266,16 @@
 		rv = -ENOMEM;
 		goto fail;
 	}
+	memset(buf, 0, len);
+
+	info.nodeid = our_nodeid;
 
 	/* FIXME: do byte swapping */
 
 	hd = (struct gdlm_header *)buf;
 	hd->type = MSG_PLOCK;
 	hd->nodeid = our_nodeid;
-	hd->to_nodeid = 0;   /* to all */
+	hd->to_nodeid = 0;
 	memcpy(buf + sizeof(struct gdlm_header), &info, sizeof(info));
 
 	rv = send_group_message(mg, len, buf);
@@ -418,7 +455,7 @@
 	struct posix_lock *po;
 
 	list_for_each_entry(po, &r->locks, list) {
-		if (po->pid == in->pid)
+		if (po->nodeid == in->nodeid && po->owner == in->owner)
 			continue;
 		if (!ranges_overlap(po->start, po->end, in->start, in->end))
 			continue;
@@ -429,8 +466,8 @@
 	return 0;
 }
 
-static int add_lock(struct resource *r, uint32_t pid, int ex,
-		    uint64_t start, uint64_t end)
+static int add_lock(struct resource *r, uint32_t nodeid, uint64_t owner,
+		    uint32_t pid, int ex, uint64_t start, uint64_t end)
 {
 	struct posix_lock *po;
 
@@ -441,6 +478,8 @@
 
 	po->start = start;
 	po->end = end;
+	po->nodeid = nodeid;
+	po->owner = owner;
 	po->pid = pid;
 	po->ex = ex;
 	list_add_tail(&po->list, &r->locks);
@@ -466,7 +505,7 @@
 	po->end = in->end;
 	po->ex = in->ex;
 
-	add_lock(r, in->pid, !in->ex, start2, end2);
+	add_lock(r, in->nodeid, in->owner, in->pid, !in->ex, start2, end2);
 
 	return 0;
 }
@@ -480,8 +519,11 @@
 		      struct gdlm_plock_info *in)
 
 {
-	add_lock(r, in->pid, !in->ex, po->start, in->start - 1);
-	add_lock(r, in->pid, !in->ex, in->end + 1, po->end);
+	add_lock(r, in->nodeid, in->owner, in->pid,
+		 !in->ex, po->start, in->start - 1);
+
+	add_lock(r, in->nodeid, in->owner, in->pid,
+		 !in->ex, in->end + 1, po->end);
 
 	po->start = in->start;
 	po->end = in->end;
@@ -497,7 +539,7 @@
 	int rv = 0;
 
 	list_for_each_entry_safe(po, safe, &r->locks, list) {
-		if (po->pid != in->pid)
+		if (po->nodeid != in->nodeid || po->owner != in->owner)
 			continue;
 		if (!ranges_overlap(po->start, po->end, in->start, in->end))
 			continue;
@@ -546,7 +588,8 @@
 		}
 	}
 
-	rv = add_lock(r, in->pid, in->ex, in->start, in->end);
+	rv = add_lock(r, in->nodeid, in->owner, in->pid,
+		      in->ex, in->start, in->end);
 
  out:
 	return rv;
@@ -560,7 +603,7 @@
 	int rv = 0;
 
 	list_for_each_entry_safe(po, safe, &r->locks, list) {
-		if (po->pid != in->pid)
+		if (po->nodeid != in->nodeid || po->owner != in->owner)
 			continue;
 		if (!ranges_overlap(po->start, po->end, in->start, in->end))
 			continue;
@@ -587,7 +630,8 @@
 			/* RN within RE - shrink and update RE to be front
 			 * fragment, and add a new lock for back fragment */
 
-			add_lock(r, in->pid, po->ex, in->end + 1, po->end);
+			add_lock(r, in->nodeid, in->owner, in->pid,
+				 po->ex, in->end + 1, po->end);
 			po->end = in->start - 1;
 			goto out;
 
@@ -704,12 +748,15 @@
 
 	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
 
-	log_group(mg, "receive_plock %d op %d fs %x num %llx ex %d wait %d",
+	/* FIXME: do byte swapping */
+
+	log_group(mg, "receive_plock from %d op %d fs %x num %llx ex %d w %d",
 		  from, info.optype, info.fsid, info.number, info.ex,
 		  info.wait);
 
-	if (from != hd->nodeid) {
-		log_error("receive_plock from %d vs %d", from, hd->nodeid);
+	if (from != hd->nodeid || from != info.nodeid) {
+		log_error("receive_plock from %d header %d info %d",
+			  from, hd->nodeid, info.nodeid);
 		rv = -EINVAL;
 		goto out;
 	}
@@ -719,9 +766,11 @@
 
 	switch (info.optype) {
 	case GDLM_PLOCK_OP_LOCK:
+		mg->last_plock_time = time(NULL);
 		rv = do_lock(mg, &info);
 		break;
 	case GDLM_PLOCK_OP_UNLOCK:
+		mg->last_plock_time = time(NULL);
 		rv = do_unlock(mg, &info);
 		break;
 	case GDLM_PLOCK_OP_GET:
@@ -738,3 +787,336 @@
 	}
 }
 
+void plock_exit(void)
+{
+	if (plocks_online)
+		saCkptFinalize(ckpt_handle);
+}
+
+void pack_section_buf(struct mountgroup *mg, struct resource *r)
+{
+	struct pack_plock *pp;
+	struct posix_lock *po;
+	struct lock_waiter *w;
+	int count = 0;
+
+	memset(&section_buf, 0, sizeof(section_buf));
+
+	pp = (struct pack_plock *) &section_buf;
+
+	list_for_each_entry(po, &r->locks, list) {
+		pp->start	= po->start;
+		pp->end		= po->end;
+		pp->pid		= po->pid;
+		pp->nodeid	= po->nodeid;
+		pp->ex		= po->ex;
+		pp->waiter	= 0;
+		pp++;
+		count++;
+	}
+
+	list_for_each_entry(w, &r->waiters, list) {
+		pp->start	= w->info.start;
+		pp->end		= w->info.end;
+		pp->pid		= w->info.pid;
+		pp->nodeid	= w->info.nodeid;
+		pp->ex		= w->info.ex;
+		pp->waiter	= 1;
+		pp++;
+		count++;
+	}
+
+	section_len = count * sizeof(struct pack_plock);
+
+	log_group(mg, "pack %llx count %d", r->number, count);
+}
+
+int unpack_section_buf(struct mountgroup *mg, char *numbuf, int buflen)
+{
+	struct pack_plock *pp;
+	struct posix_lock *po;
+	struct lock_waiter *w;
+	struct resource *r;
+	int count = section_len / sizeof(struct pack_plock);
+	int i;
+
+	r = malloc(sizeof(struct resource));
+	if (!r)
+		return -ENOMEM;
+	memset(r, 0, sizeof(struct resource));
+
+	sscanf(numbuf, "%llu", &r->number);
+
+	log_group(mg, "unpack %llx count %d", r->number, count);
+
+	pp = (struct pack_plock *) &section_buf;
+
+	for (i = 0; i < count; i++) {
+		if (!pp->waiter) {
+			po = malloc(sizeof(struct posix_lock));
+			po->start	= pp->start;
+			po->end		= pp->end;
+			po->pid		= pp->pid;
+			po->ex		= pp->ex;
+			list_add_tail(&po->list, &r->locks);
+		} else {
+			w = malloc(sizeof(struct lock_waiter));
+			w->info.start	= pp->start;
+			w->info.end	= pp->end;
+			w->info.pid	= pp->pid;
+			w->info.nodeid	= pp->nodeid;
+			w->info.ex	= pp->ex;
+			list_add_tail(&w->list, &r->waiters);
+		}
+		pp++;
+	}
+
+	list_add_tail(&r->list, &mg->resources);
+	return 0;
+}
+
+/* copy all plock state into a checkpoint so new node can retrieve it */
+
+void store_plocks(struct mountgroup *mg)
+{
+	SaCkptCheckpointCreationAttributesT attr;
+	SaCkptCheckpointHandleT h;
+	SaCkptSectionIdT section_id;
+	SaCkptSectionCreationAttributesT section_attr;
+	SaNameT name;
+	SaAisErrorT rv;
+	char buf[32];
+	struct resource *r;
+	struct posix_lock *po;
+	struct lock_waiter *w;
+	int len, r_count, total_size, section_size, max_section_size;
+
+	if (!plocks_online)
+		return;
+
+	/* no change to plock state since we created the last checkpoint */
+	if (mg->last_checkpoint_time > mg->last_plock_time) {
+		log_group(mg, "store_plocks: ckpt uptodate");
+		return;
+	}
+	mg->last_checkpoint_time = time(NULL);
+
+	len = snprintf(name.value, SA_MAX_NAME_LENGTH, "gfsplock.%s", mg->name);
+	name.length = len;
+
+	/* unlink an old checkpoint before we create a new one */
+	if (mg->cp_handle) {
+		log_group(mg, "store_plocks: unlink ckpt");
+		h = (SaCkptCheckpointHandleT) mg->cp_handle;
+		rv = saCkptCheckpointUnlink(h, &name);
+		if (rv != SA_AIS_OK)
+			log_error("ckpt unlink error %d %s", rv, mg->name);
+		h = 0;
+		mg->cp_handle = 0;
+	}
+
+	/* loop through all plocks to figure out sizes to set in
+	   the attr fields */
+
+	r_count = 0;
+	total_size = 0;
+	max_section_size = 0;
+
+	list_for_each_entry(r, &mg->resources, list) {
+		r_count++;
+		section_size = 0;
+		list_for_each_entry(po, &r->locks, list)
+			section_size += sizeof(struct pack_plock);
+		list_for_each_entry(w, &r->waiters, list)
+			section_size += sizeof(struct pack_plock);
+		total_size += section_size;
+		if (section_size > max_section_size)
+			max_section_size = section_size;
+	}
+
+	log_group(mg, "store_plocks: r_count %d total %d max_section %d",
+		  r_count, total_size, max_section_size);
+
+	attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
+	attr.checkpointSize = total_size;
+	attr.retentionDuration = SA_TIME_MAX;
+	attr.maxSections = r_count;
+	attr.maxSectionSize = max_section_size;
+	attr.maxSectionIdSize = 21;             /* 20 digits in max uint64 */
+
+ open_retry:
+	rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr,
+				  SA_CKPT_CHECKPOINT_CREATE |
+				  SA_CKPT_CHECKPOINT_READ |
+				  SA_CKPT_CHECKPOINT_WRITE,
+				  0, &h);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(mg, "store_plocks: ckpt open retry");
+		sleep(1);
+		goto open_retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("store_plocks: ckpt open error %d %s", rv, mg->name);
+		return;
+	}
+
+	mg->cp_handle = (uint64_t) h;
+
+	list_for_each_entry(r, &mg->resources, list) {
+		memset(&buf, 0, 32);
+		len = snprintf(buf, 32, "%llu", r->number);
+
+		section_id.id = buf;
+		section_id.idLen = len + 1;
+		section_attr.sectionId = &section_id;
+		section_attr.expirationTime = SA_TIME_END;
+
+		pack_section_buf(mg, r);
+
+ create_retry:
+		rv = saCkptSectionCreate(h, &section_attr, &section_buf,
+					 section_len);
+		if (rv == SA_AIS_ERR_TRY_AGAIN) {
+			log_group(mg, "store_plocks: ckpt create retry");
+			sleep(1);
+			goto create_retry;
+		}
+		if (rv != SA_AIS_OK) {
+			log_error("store_plocks: ckpt create error %d %s",
+				  rv, mg->name);
+			break;
+		}
+	}
+}
+
+/* called by a node that's just been added to the group to get existing plock
+   state */
+
+void retrieve_plocks(struct mountgroup *mg)
+{
+	SaCkptCheckpointHandleT h;
+	SaCkptSectionIterationHandleT itr;
+	SaCkptSectionDescriptorT desc;
+	SaCkptIOVectorElementT iov;
+	SaNameT name;
+	SaAisErrorT rv;
+	int len;
+
+	if (!plocks_online)
+		return;
+
+	len = snprintf(name.value, SA_MAX_NAME_LENGTH, "gfsplock.%s", mg->name);
+	name.length = len;
+
+ open_retry:
+	rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
+				  SA_CKPT_CHECKPOINT_READ, 0, &h);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(mg, "retrieve_plocks: ckpt open retry");
+		sleep(1);
+		goto open_retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("retrieve_plocks: ckpt open error %d %s",
+			  rv, mg->name);
+		return;
+	}
+
+ init_retry:
+	rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
+	if (rv == SA_AIS_ERR_TRY_AGAIN) {
+		log_group(mg, "retrieve_plocks: ckpt iterinit retry");
+		sleep(1);
+		goto init_retry;
+	}
+	if (rv != SA_AIS_OK) {
+		log_error("retrieve_plocks: ckpt iterinit error %d %s",
+			  rv, mg->name);
+		return;
+	}
+
+	while (1) {
+ next_retry:
+		rv = saCkptSectionIterationNext(itr, &desc);
+		if (rv == SA_AIS_ERR_NO_SECTIONS)
+			break;
+		if (rv == SA_AIS_ERR_TRY_AGAIN) {
+			log_group(mg, "retrieve_plocks: ckpt iternext retry");
+			sleep(1);
+			goto next_retry;
+		}
+		if (rv != SA_AIS_OK) {
+			log_error("retrieve_plocks: ckpt iternext error %d %s",
+				  rv, mg->name);
+			break;
+		}
+
+		iov.sectionId = desc.sectionId;
+		iov.dataBuffer = &section_buf;
+		iov.dataSize = desc.sectionSize;
+		iov.dataOffset = 0;
+
+ read_retry:
+		rv = saCkptCheckpointRead(h, &iov, 1, NULL);
+		if (rv == SA_AIS_ERR_TRY_AGAIN) {
+			log_group(mg, "retrieve_plocks: ckpt read retry");
+			sleep(1);
+			goto read_retry;
+		}
+		if (rv != SA_AIS_OK) {
+			log_error("retrieve_plocks: ckpt read error %d %s",
+				  rv, mg->name);
+			break;
+		}
+
+		unpack_section_buf(mg, desc.sectionId.id, desc.sectionId.idLen);
+	}
+
+	saCkptSectionIterationFinalize(itr);
+	saCkptCheckpointClose(h);
+}
+
+int dump_plocks(char *name, int fd)
+{
+	struct mountgroup *mg;
+	struct posix_lock *po;
+	struct lock_waiter *w;
+	struct resource *r;
+	char line[MAXLINE];
+	int rv;
+
+	if (!name)
+		return -1;
+
+	mg = find_mg(name);
+	if (!mg)
+		return -1;
+
+	list_for_each_entry(r, &mg->resources, list) {
+
+		list_for_each_entry(po, &r->locks, list) {
+			snprintf(line, MAXLINE,
+			      "%llu %s %llu-%llu nodeid %d pid %u owner %llx\n",
+			      r->number,
+			      po->ex ? "WR" : "RD",
+			      po->start, po->end,
+			      po->nodeid, po->pid, po->owner);
+
+			rv = write(fd, line, strlen(line));
+		}
+
+		list_for_each_entry(w, &r->waiters, list) {
+			snprintf(line, MAXLINE,
+			      "%llu WAITING %s %llu-%llu nodeid %d pid %u owner %llx\n",
+			      r->number,
+			      po->ex ? "WR" : "RD",
+			      po->start, po->end,
+			      po->nodeid, po->pid, po->owner);
+
+			rv = write(fd, line, strlen(line));
+		}
+	}
+
+	return 0;
+}
+
--- cluster/group/gfs_controld/recover.c	2006/07/20 20:19:44	1.4
+++ cluster/group/gfs_controld/recover.c	2006/07/31 18:37:07	1.5
@@ -602,6 +602,7 @@
 	   so the second node won't mount the fs until omm. */
 
 	if (mg->low_finished_nodeid == our_nodeid) {
+		store_plocks(mg);
 		if (mg->first_mounter && !mg->first_mounter_done) {
 			log_group(mg, "delay sending journals to %d",
 				  new->nodeid);
@@ -1781,6 +1782,9 @@
 		mg->first_mounter = 1;
 		mg->first_mounter_done = 0;
 	}
+
+	retrieve_plocks(mg);
+	/* process_saved_plocks(mg); */
  out:
 	notify_mount_client(mg);
 }




More information about the Cluster-devel mailing list