[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Linux-cluster] [gfs_controld] send messages through separate cpg



[new process requires all work to be sent to ml prior to cvs check-in]

Set up a separate cpg for sending messages (e.g. for processing
mount/unmount) instead of sending them through the cpg used to represent
the mount group.  Since we apply cpg changes to the mount group async,
that cpg won't always contain all the nodes we need to process the
mount/unmount.  A mount from one node in parallel with unmount from
another often won't work without this.


diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/Makefile cluster/gfs/lock_dlm/daemon/Makefile
--- cluster-HEAD/gfs/lock_dlm/daemon/Makefile	2006-03-27 01:31:46.000000000 -0600
+++ cluster/gfs/lock_dlm/daemon/Makefile	2006-06-06 17:19:40.740421037 -0500
@@ -21,6 +21,7 @@
 	-I../../include/ \
 	-I../../../group/lib/ \
 	-I../../../cman/lib/ \
+	-I../../../cman/daemon/openais/trunk/include/ \
 	-I../../../dlm/lib/ \
 	-I../../../gfs-kernel/src/dlm/
 
@@ -33,12 +34,14 @@
 
 gfs_controld: 	main.o \
 		member_cman.o \
+		cpg.o \
 		group.o \
 		plock.o \
 		recover.o \
 		withdraw.o \
 		../../../dlm/lib/libdlm_lt.a \
 		../../../cman/lib/libcman.a \
+		../../../cman/daemon/openais/trunk/lib/libcpg.a \
 		../../../group/lib/libgroup.a
 	$(CC) $(LDFLAGS) -o $@ $^
 
@@ -49,6 +52,9 @@
 member_cman.o: member_cman.c
 	$(CC) $(CFLAGS) -c -o $@ $<
 
+cpg.o: cpg.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+
 recover.o: recover.c
 	$(CC) $(CFLAGS) -c -o $@ $<
 
diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/cpg.c cluster/gfs/lock_dlm/daemon/cpg.c
--- cluster-HEAD/gfs/lock_dlm/daemon/cpg.c	1969-12-31 18:00:00.000000000 -0600
+++ cluster/gfs/lock_dlm/daemon/cpg.c	2006-06-07 11:54:28.478585576 -0500
@@ -0,0 +1,212 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "lock_dlm.h"
+#include "cpg.h"
+
+static cpg_handle_t	daemon_handle;
+static struct cpg_name	daemon_name;
+static int		got_msg;
+static int		saved_nodeid;
+static int		saved_len;
+static char		saved_data[MAX_MSGLEN];
+
+void receive_journals(struct mountgroup *mg, char *buf, int len, int from);
+void receive_options(struct mountgroup *mg, char *buf, int len, int from);
+void receive_remount(struct mountgroup *mg, char *buf, int len, int from);
+void receive_plock(struct mountgroup *mg, char *buf, int len, int from);
+void receive_recovery_status(struct mountgroup *mg, char *buf, int len,
+			     int from);
+void receive_recovery_done(struct mountgroup *mg, char *buf, int len, int from);
+
+
+static void do_deliver(int nodeid, char *data, int len)
+{
+	struct mountgroup *mg;
+	struct gdlm_header *hd;
+
+	hd = (struct gdlm_header *) data;
+
+	mg = find_mg(hd->name);
+	if (!mg)
+		return;
+
+	hd->version[0]	= le16_to_cpu(hd->version[0]);
+	hd->version[1]	= le16_to_cpu(hd->version[1]);
+	hd->version[2]	= le16_to_cpu(hd->version[2]);
+	hd->type	= le16_to_cpu(hd->type);
+	hd->nodeid	= le32_to_cpu(hd->nodeid);
+	hd->to_nodeid	= le32_to_cpu(hd->to_nodeid);
+
+	if (hd->version[0] != GDLM_VER_MAJOR) {
+		log_error("reject message version %u.%u.%u",
+			  hd->version[0], hd->version[1], hd->version[2]);
+		return;
+	}
+
+	/* If there are some group messages between a new node being added to
+	   the cpg group and being added to the app group, the new node should
+	   discard them since they're only relevant to the app group. */
+
+	if (!mg->last_callback) {
+		log_group(mg, "discard message type %d len %d from %d",
+			  hd->type, len, nodeid);
+		return;
+	}
+
+	switch (hd->type) {
+	case MSG_JOURNAL: 
+		receive_journals(mg, data, len, nodeid);
+		break;
+
+	case MSG_OPTIONS:
+		receive_options(mg, data, len, nodeid);
+		break;
+
+	case MSG_REMOUNT:
+		receive_remount(mg, data, len, nodeid);
+		break;
+
+	case MSG_PLOCK:
+		receive_plock(mg, data, len, nodeid);
+		break;
+
+	case MSG_RECOVERY_STATUS:
+		receive_recovery_status(mg, data, len, nodeid);
+		break;
+
+	case MSG_RECOVERY_DONE:
+		receive_recovery_done(mg, data, len, nodeid);
+		break;
+
+	default:
+		log_error("unknown message type %d from %d",
+			  hd->type, hd->nodeid);
+	}
+}
+
+void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		uint32_t nodeid, uint32_t pid, void *data, int data_len)
+{
+	saved_nodeid = nodeid;
+	saved_len = data_len;
+	memcpy(saved_data, data, data_len);
+	got_msg = 1;
+}
+
+void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		struct cpg_address *member_list, int member_list_entries,
+		struct cpg_address *left_list, int left_list_entries,
+		struct cpg_address *joined_list, int joined_list_entries)
+{
+}
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn = deliver_cb,
+	.cpg_confchg_fn = confchg_cb,
+};
+
+int process_cpg(void)
+{
+	cpg_error_t error;
+	
+	got_msg = 0;
+	saved_len = 0;
+	saved_nodeid = 0;
+	memset(saved_data, 0, sizeof(saved_data));
+
+	error = cpg_dispatch(daemon_handle, CPG_DISPATCH_ONE);
+	if (error != CPG_OK) {
+		log_error("cpg_dispatch error %d", error);
+		return -1;
+	}
+
+	if (got_msg)
+		do_deliver(saved_nodeid, saved_data, saved_len);
+	return 0;
+}
+
+int setup_cpg(void)
+{
+	cpg_error_t error;
+	int fd = 0;
+
+	error = cpg_initialize(&daemon_handle, &callbacks);
+	if (error != CPG_OK) {
+		log_error("cpg_initialize error %d", error);
+		return -1;
+	}
+
+	cpg_fd_get(daemon_handle, &fd);
+	if (fd < 0)
+		return -1;
+
+	memset(&daemon_name, 0, sizeof(daemon_name));
+	strcpy(daemon_name.value, "gfs_controld");
+	daemon_name.length = 12;
+
+ retry:
+	error = cpg_join(daemon_handle, &daemon_name);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		log_debug("setup_cpg cpg_join retry");
+		sleep(1);
+		goto retry;
+	}
+	if (error != CPG_OK) {
+		log_error("cpg_join error %d", error);
+		cpg_finalize(daemon_handle);
+		return -1;
+	}
+
+	log_debug("cpg %d", fd);
+	return fd;
+}
+
+static int _send_message(cpg_handle_t h, void *buf, int len)
+{
+	struct iovec iov;
+	cpg_error_t error;
+	int retries = 0;
+
+	iov.iov_base = buf;
+	iov.iov_len = len;
+
+ retry:
+	error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
+	if (error != CPG_OK)
+		log_error("cpg_mcast_joined error %d handle %llx", error, h);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		/* FIXME: backoff say .25 sec, .5 sec, .75 sec, 1 sec */
+		retries++;
+		if (retries > 3)
+			sleep(1);
+		goto retry;
+	}
+
+	return 0;
+}
+
+int send_group_message(struct mountgroup *mg, int len, char *buf)
+{
+	struct gdlm_header *hd = (struct gdlm_header *) buf;
+
+	hd->version[0]	= cpu_to_le16(GDLM_VER_MAJOR);
+	hd->version[1]	= cpu_to_le16(GDLM_VER_MINOR);
+	hd->version[2]	= cpu_to_le16(GDLM_VER_PATCH);
+	hd->type	= cpu_to_le16(hd->type);
+	hd->nodeid	= cpu_to_le32(hd->nodeid);
+	hd->to_nodeid	= cpu_to_le32(hd->to_nodeid);
+	memcpy(hd->name, mg->name, strlen(mg->name));
+	
+	return _send_message(daemon_handle, buf, len);
+}
+
diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/group.c cluster/gfs/lock_dlm/daemon/group.c
--- cluster-HEAD/gfs/lock_dlm/daemon/group.c	2006-06-07 12:10:32.102338261 -0500
+++ cluster/gfs/lock_dlm/daemon/group.c	2006-06-06 17:23:06.523976113 -0500
@@ -21,25 +21,14 @@
 static int cb_event_nr;
 static unsigned int cb_id;
 static int cb_type;
-static int cb_nodeid;
-static int cb_len;
 static int cb_member_count;
 static int cb_members[MAX_GROUP_MEMBERS];
-static char cb_message[MAX_MSGLEN+1];
 
 int do_stop(struct mountgroup *mg);
 int do_finish(struct mountgroup *mg);
 int do_terminate(struct mountgroup *mg);
 int do_start(struct mountgroup *mg, int type, int count, int *nodeids);
 
-void receive_journals(struct mountgroup *mg, char *buf, int len, int from);
-void receive_options(struct mountgroup *mg, char *buf, int len, int from);
-void receive_remount(struct mountgroup *mg, char *buf, int len, int from);
-void receive_plock(struct mountgroup *mg, char *buf, int len, int from);
-void receive_recovery_status(struct mountgroup *mg, char *buf, int len,
-			     int from);
-void receive_recovery_done(struct mountgroup *mg, char *buf, int len, int from);
-
 
 static void stop_cbfn(group_handle_t h, void *private, char *name)
 {
@@ -87,17 +76,9 @@
 static void deliver_cbfn(group_handle_t h, void *private, char *name,
 			 int nodeid, int len, char *buf)
 {
-	int n;
-	cb_action = DO_DELIVER;
-	strncpy(cb_name, name, MAX_GROUP_NAME_LEN);
-	cb_nodeid = nodeid;
-	cb_len = n = len;
-	if (len > MAX_MSGLEN)
-		n = MAX_MSGLEN;
-	memcpy(&cb_message, buf, n);
 }
 
-group_callbacks_t callbacks = {
+static group_callbacks_t callbacks = {
 	stop_cbfn,
 	start_cbfn,
 	finish_cbfn,
@@ -106,53 +87,6 @@
 	deliver_cbfn
 };
 
-static void do_deliver(struct mountgroup *mg)
-{
-	struct gdlm_header *hd;
-
-	hd = (struct gdlm_header *) cb_message;
-
-	/* If there are some group messages between a new node being added to
-	   the cpg group and being added to the app group, the new node should
-	   discard them since they're only relevant to the app group. */
-
-	if (!mg->last_callback) {
-		log_group(mg, "discard message type %d len %d from %d",
-			  hd->type, cb_len, cb_nodeid);
-		return;
-	}
-
-	switch (hd->type) {
-	case MSG_JOURNAL:
-		receive_journals(mg, cb_message, cb_len, cb_nodeid);
-		break;
-
-	case MSG_OPTIONS:
-		receive_options(mg, cb_message, cb_len, cb_nodeid);
-		break;
-
-	case MSG_REMOUNT:
-		receive_remount(mg, cb_message, cb_len, cb_nodeid);
-		break;
-
-	case MSG_PLOCK:
-		receive_plock(mg, cb_message, cb_len, cb_nodeid);
-		break;
-
-	case MSG_RECOVERY_STATUS:
-		receive_recovery_status(mg, cb_message, cb_len, cb_nodeid);
-		break;
-
-	case MSG_RECOVERY_DONE:
-		receive_recovery_done(mg, cb_message, cb_len, cb_nodeid);
-		break;
-
-	default:
-		log_error("unknown message type %d from %d",
-			  hd->type, hd->nodeid);
-	}
-}
-
 char *str_members(void)
 {
 	static char buf[MAXLINE];
@@ -222,12 +156,6 @@
 		mg->id = cb_id;
 		break;
 
-	case DO_DELIVER:
-		log_debug("groupd callback: deliver %s len %d nodeid %d",
-			  cb_name, cb_len, cb_nodeid);
-		do_deliver(mg);
-		break;
-
 	default:
 		error = -EINVAL;
 	}
@@ -257,15 +185,3 @@
 	return rv;
 }
 
-int send_group_message(struct mountgroup *mg, int len, char *buf)
-{
-	int error;
-
-	error = group_send(gh, mg->name, len, buf);
-	if (error < 0)
-		log_error("group_send error %d errno %d", error, errno);
-	else
-		error = 0;
-	return error;
-}
-
diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/lock_dlm.h cluster/gfs/lock_dlm/daemon/lock_dlm.h
--- cluster-HEAD/gfs/lock_dlm/daemon/lock_dlm.h	2006-05-25 14:30:40.000000000 -0500
+++ cluster/gfs/lock_dlm/daemon/lock_dlm.h	2006-06-06 17:18:25.510916543 -0500
@@ -201,11 +201,16 @@
 	MSG_RECOVERY_DONE,
 };
 
+#define GDLM_VER_MAJOR 1
+#define GDLM_VER_MINOR 0
+#define GDLM_VER_PATCH 0
+
 struct gdlm_header {
 	uint16_t		version[3];
 	uint16_t		type;			/* MSG_ */
 	uint32_t		nodeid;			/* sender */
 	uint32_t		to_nodeid;		/* 0 if to all */
+	char			name[MAXNAME];
 };
 
 
@@ -214,6 +219,8 @@
 
 int setup_cman(void);
 int process_cman(void);
+int setup_cpg(void);
+int process_cpg(void);
 int setup_groupd(void);
 int process_groupd(void);
 int setup_libdlm(void);
diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/main.c cluster/gfs/lock_dlm/daemon/main.c
--- cluster-HEAD/gfs/lock_dlm/daemon/main.c	2006-04-21 14:54:10.000000000 -0500
+++ cluster/gfs/lock_dlm/daemon/main.c	2006-06-07 11:59:12.248223925 -0500
@@ -25,6 +25,7 @@
 static struct pollfd pollfd[MAX_CLIENTS];
 
 static int cman_fd;
+static int cpg_fd;
 static int listen_fd;
 static int groupd_fd;
 static int uevent_fd;
@@ -249,6 +250,11 @@
 		goto out;
 	client_add(cman_fd, &maxi);
 
+	rv = cpg_fd = setup_cpg();
+	if (rv < 0)
+		goto out;
+	client_add(cpg_fd, &maxi);
+
 	rv = groupd_fd = setup_groupd();
 	if (rv < 0)
 		goto out;
@@ -272,6 +278,8 @@
 		goto out;
 	client_add(plocks_fd, &maxi);
 
+	log_debug("setup done");
+
 	for (;;) {
 		rv = poll(pollfd, maxi + 1, -1);
 		if (rv < 0)
@@ -296,6 +304,8 @@
 					process_groupd();
 				else if (pollfd[i].fd == cman_fd)
 					process_cman();
+				else if (pollfd[i].fd == cpg_fd)
+					process_cpg();
 				else if (pollfd[i].fd == uevent_fd)
 					process_uevent();
 				else if (!no_withdraw &&
@@ -310,7 +320,6 @@
 			if (pollfd[i].revents & POLLHUP) {
 				if (pollfd[i].fd == cman_fd)
 					exit_cman();
-				log_debug("closing fd %d", pollfd[i].fd);
 				close(pollfd[i].fd);
 			}
 		}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]