[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [Linux-cluster] [gfs_controld] send messages through separate cpg



Dave,

I'd say the cpg bits look really good except for the mcast operation
(where you have a FIXME).

I'd recommend not backing off here, but instead spinning on the transmit
if ERR_TRY_AGAIN is returned.  Even on a heavily loaded system the delay
should not be very significant on a spin operation, unless this code has
certain timeouts (not sure about that) that would expire.  It would
appear not since the code suggests backing off using a timer.

Regards
-steve

On Wed, 2006-06-07 at 12:27 -0500, David Teigland wrote:
> [new process requires all work to be sent to ml prior to cvs check-in]
> 
> Set up a separate cpg for sending messages (e.g. for processing
> mount/unmount) instead of sending them through the cpg used to represent
> the mount group.  Since we apply cpg changes to the mount group async,
> that cpg won't always contain all the nodes we need to process the
> mount/unmount.  A mount from one node in parallel with unmount from
> another often won't work without this.
> 
> 
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/Makefile cluster/gfs/lock_dlm/daemon/Makefile
> --- cluster-HEAD/gfs/lock_dlm/daemon/Makefile	2006-03-27 01:31:46.000000000 -0600
> +++ cluster/gfs/lock_dlm/daemon/Makefile	2006-06-06 17:19:40.740421037 -0500
> @@ -21,6 +21,7 @@
>  	-I../../include/ \
>  	-I../../../group/lib/ \
>  	-I../../../cman/lib/ \
> +	-I../../../cman/daemon/openais/trunk/include/ \
>  	-I../../../dlm/lib/ \
>  	-I../../../gfs-kernel/src/dlm/
>  
> @@ -33,12 +34,14 @@
>  
>  gfs_controld: 	main.o \
>  		member_cman.o \
> +		cpg.o \
>  		group.o \
>  		plock.o \
>  		recover.o \
>  		withdraw.o \
>  		../../../dlm/lib/libdlm_lt.a \
>  		../../../cman/lib/libcman.a \
> +		../../../cman/daemon/openais/trunk/lib/libcpg.a \
>  		../../../group/lib/libgroup.a
>  	$(CC) $(LDFLAGS) -o $@ $^
>  
> @@ -49,6 +52,9 @@
>  member_cman.o: member_cman.c
>  	$(CC) $(CFLAGS) -c -o $@ $<
>  
> +cpg.o: cpg.c
> +	$(CC) $(CFLAGS) -c -o $@ $<
> +
>  recover.o: recover.c
>  	$(CC) $(CFLAGS) -c -o $@ $<
>  
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/cpg.c cluster/gfs/lock_dlm/daemon/cpg.c
> --- cluster-HEAD/gfs/lock_dlm/daemon/cpg.c	1969-12-31 18:00:00.000000000 -0600
> +++ cluster/gfs/lock_dlm/daemon/cpg.c	2006-06-07 11:54:28.478585576 -0500
> @@ -0,0 +1,212 @@
> +/******************************************************************************
> +*******************************************************************************
> +**
> +**  Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
> +**
> +**  This copyrighted material is made available to anyone wishing to use,
> +**  modify, copy, or redistribute it subject to the terms and conditions
> +**  of the GNU General Public License v.2.
> +**
> +*******************************************************************************
> +******************************************************************************/
> +
> +#include "lock_dlm.h"
> +#include "cpg.h"
> +
> +static cpg_handle_t	daemon_handle;
> +static struct cpg_name	daemon_name;
> +static int		got_msg;
> +static int		saved_nodeid;
> +static int		saved_len;
> +static char		saved_data[MAX_MSGLEN];
> +
> +void receive_journals(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_options(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_remount(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_plock(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_recovery_status(struct mountgroup *mg, char *buf, int len,
> +			     int from);
> +void receive_recovery_done(struct mountgroup *mg, char *buf, int len, int from);
> +
> +
> +static void do_deliver(int nodeid, char *data, int len)
> +{
> +	struct mountgroup *mg;
> +	struct gdlm_header *hd;
> +
> +	hd = (struct gdlm_header *) data;
> +
> +	mg = find_mg(hd->name);
> +	if (!mg)
> +		return;
> +
> +	hd->version[0]	= le16_to_cpu(hd->version[0]);
> +	hd->version[1]	= le16_to_cpu(hd->version[1]);
> +	hd->version[2]	= le16_to_cpu(hd->version[2]);
> +	hd->type	= le16_to_cpu(hd->type);
> +	hd->nodeid	= le32_to_cpu(hd->nodeid);
> +	hd->to_nodeid	= le32_to_cpu(hd->to_nodeid);
> +
> +	if (hd->version[0] != GDLM_VER_MAJOR) {
> +		log_error("reject message version %u.%u.%u",
> +			  hd->version[0], hd->version[1], hd->version[2]);
> +		return;
> +	}
> +
> +	/* If there are some group messages between a new node being added to
> +	   the cpg group and being added to the app group, the new node should
> +	   discard them since they're only relevant to the app group. */
> +
> +	if (!mg->last_callback) {
> +		log_group(mg, "discard message type %d len %d from %d",
> +			  hd->type, len, nodeid);
> +		return;
> +	}
> +
> +	switch (hd->type) {
> +	case MSG_JOURNAL: 
> +		receive_journals(mg, data, len, nodeid);
> +		break;
> +
> +	case MSG_OPTIONS:
> +		receive_options(mg, data, len, nodeid);
> +		break;
> +
> +	case MSG_REMOUNT:
> +		receive_remount(mg, data, len, nodeid);
> +		break;
> +
> +	case MSG_PLOCK:
> +		receive_plock(mg, data, len, nodeid);
> +		break;
> +
> +	case MSG_RECOVERY_STATUS:
> +		receive_recovery_status(mg, data, len, nodeid);
> +		break;
> +
> +	case MSG_RECOVERY_DONE:
> +		receive_recovery_done(mg, data, len, nodeid);
> +		break;
> +
> +	default:
> +		log_error("unknown message type %d from %d",
> +			  hd->type, hd->nodeid);
> +	}
> +}
> +
> +void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
> +		uint32_t nodeid, uint32_t pid, void *data, int data_len)
> +{
> +	saved_nodeid = nodeid;
> +	saved_len = data_len;
> +	memcpy(saved_data, data, data_len);
> +	got_msg = 1;
> +}
> +
> +void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
> +		struct cpg_address *member_list, int member_list_entries,
> +		struct cpg_address *left_list, int left_list_entries,
> +		struct cpg_address *joined_list, int joined_list_entries)
> +{
> +}
> +
> +static cpg_callbacks_t callbacks = {
> +	.cpg_deliver_fn = deliver_cb,
> +	.cpg_confchg_fn = confchg_cb,
> +};
> +
> +int process_cpg(void)
> +{
> +	cpg_error_t error;
> +	
> +	got_msg = 0;
> +	saved_len = 0;
> +	saved_nodeid = 0;
> +	memset(saved_data, 0, sizeof(saved_data));
> +
> +	error = cpg_dispatch(daemon_handle, CPG_DISPATCH_ONE);
> +	if (error != CPG_OK) {
> +		log_error("cpg_dispatch error %d", error);
> +		return -1;
> +	}
> +
> +	if (got_msg)
> +		do_deliver(saved_nodeid, saved_data, saved_len);
> +	return 0;
> +}
> +
> +int setup_cpg(void)
> +{
> +	cpg_error_t error;
> +	int fd = 0;
> +
> +	error = cpg_initialize(&daemon_handle, &callbacks);
> +	if (error != CPG_OK) {
> +		log_error("cpg_initialize error %d", error);
> +		return -1;
> +	}
> +
> +	cpg_fd_get(daemon_handle, &fd);
> +	if (fd < 0)
> +		return -1;
> +
> +	memset(&daemon_name, 0, sizeof(daemon_name));
> +	strcpy(daemon_name.value, "gfs_controld");
> +	daemon_name.length = 12;
> +
> + retry:
> +	error = cpg_join(daemon_handle, &daemon_name);
> +	if (error == CPG_ERR_TRY_AGAIN) {
> +		log_debug("setup_cpg cpg_join retry");
> +		sleep(1);
> +		goto retry;
> +	}
> +	if (error != CPG_OK) {
> +		log_error("cpg_join error %d", error);
> +		cpg_finalize(daemon_handle);
> +		return -1;
> +	}
> +
> +	log_debug("cpg %d", fd);
> +	return fd;
> +}
> +
> +static int _send_message(cpg_handle_t h, void *buf, int len)
> +{
> +	struct iovec iov;
> +	cpg_error_t error;
> +	int retries = 0;
> +
> +	iov.iov_base = buf;
> +	iov.iov_len = len;
> +
> + retry:
> +	error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
> +	if (error != CPG_OK)
> +		log_error("cpg_mcast_joined error %d handle %llx", error, h);
> +	if (error == CPG_ERR_TRY_AGAIN) {
> +		/* FIXME: backoff say .25 sec, .5 sec, .75 sec, 1 sec */
> +		retries++;
> +		if (retries > 3)
> +			sleep(1);
> +		goto retry;
> +	}
> +
> +	return 0;
> +}
> +
> +int send_group_message(struct mountgroup *mg, int len, char *buf)
> +{
> +	struct gdlm_header *hd = (struct gdlm_header *) buf;
> +
> +	hd->version[0]	= cpu_to_le16(GDLM_VER_MAJOR);
> +	hd->version[1]	= cpu_to_le16(GDLM_VER_MINOR);
> +	hd->version[2]	= cpu_to_le16(GDLM_VER_PATCH);
> +	hd->type	= cpu_to_le16(hd->type);
> +	hd->nodeid	= cpu_to_le32(hd->nodeid);
> +	hd->to_nodeid	= cpu_to_le32(hd->to_nodeid);
> +	memcpy(hd->name, mg->name, strlen(mg->name));
> +	
> +	return _send_message(daemon_handle, buf, len);
> +}
> +
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/group.c cluster/gfs/lock_dlm/daemon/group.c
> --- cluster-HEAD/gfs/lock_dlm/daemon/group.c	2006-06-07 12:10:32.102338261 -0500
> +++ cluster/gfs/lock_dlm/daemon/group.c	2006-06-06 17:23:06.523976113 -0500
> @@ -21,25 +21,14 @@
>  static int cb_event_nr;
>  static unsigned int cb_id;
>  static int cb_type;
> -static int cb_nodeid;
> -static int cb_len;
>  static int cb_member_count;
>  static int cb_members[MAX_GROUP_MEMBERS];
> -static char cb_message[MAX_MSGLEN+1];
>  
>  int do_stop(struct mountgroup *mg);
>  int do_finish(struct mountgroup *mg);
>  int do_terminate(struct mountgroup *mg);
>  int do_start(struct mountgroup *mg, int type, int count, int *nodeids);
>  
> -void receive_journals(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_options(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_remount(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_plock(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_recovery_status(struct mountgroup *mg, char *buf, int len,
> -			     int from);
> -void receive_recovery_done(struct mountgroup *mg, char *buf, int len, int from);
> -
>  
>  static void stop_cbfn(group_handle_t h, void *private, char *name)
>  {
> @@ -87,17 +76,9 @@
>  static void deliver_cbfn(group_handle_t h, void *private, char *name,
>  			 int nodeid, int len, char *buf)
>  {
> -	int n;
> -	cb_action = DO_DELIVER;
> -	strncpy(cb_name, name, MAX_GROUP_NAME_LEN);
> -	cb_nodeid = nodeid;
> -	cb_len = n = len;
> -	if (len > MAX_MSGLEN)
> -		n = MAX_MSGLEN;
> -	memcpy(&cb_message, buf, n);
>  }
>  
> -group_callbacks_t callbacks = {
> +static group_callbacks_t callbacks = {
>  	stop_cbfn,
>  	start_cbfn,
>  	finish_cbfn,
> @@ -106,53 +87,6 @@
>  	deliver_cbfn
>  };
>  
> -static void do_deliver(struct mountgroup *mg)
> -{
> -	struct gdlm_header *hd;
> -
> -	hd = (struct gdlm_header *) cb_message;
> -
> -	/* If there are some group messages between a new node being added to
> -	   the cpg group and being added to the app group, the new node should
> -	   discard them since they're only relevant to the app group. */
> -
> -	if (!mg->last_callback) {
> -		log_group(mg, "discard message type %d len %d from %d",
> -			  hd->type, cb_len, cb_nodeid);
> -		return;
> -	}
> -
> -	switch (hd->type) {
> -	case MSG_JOURNAL:
> -		receive_journals(mg, cb_message, cb_len, cb_nodeid);
> -		break;
> -
> -	case MSG_OPTIONS:
> -		receive_options(mg, cb_message, cb_len, cb_nodeid);
> -		break;
> -
> -	case MSG_REMOUNT:
> -		receive_remount(mg, cb_message, cb_len, cb_nodeid);
> -		break;
> -
> -	case MSG_PLOCK:
> -		receive_plock(mg, cb_message, cb_len, cb_nodeid);
> -		break;
> -
> -	case MSG_RECOVERY_STATUS:
> -		receive_recovery_status(mg, cb_message, cb_len, cb_nodeid);
> -		break;
> -
> -	case MSG_RECOVERY_DONE:
> -		receive_recovery_done(mg, cb_message, cb_len, cb_nodeid);
> -		break;
> -
> -	default:
> -		log_error("unknown message type %d from %d",
> -			  hd->type, hd->nodeid);
> -	}
> -}
> -
>  char *str_members(void)
>  {
>  	static char buf[MAXLINE];
> @@ -222,12 +156,6 @@
>  		mg->id = cb_id;
>  		break;
>  
> -	case DO_DELIVER:
> -		log_debug("groupd callback: deliver %s len %d nodeid %d",
> -			  cb_name, cb_len, cb_nodeid);
> -		do_deliver(mg);
> -		break;
> -
>  	default:
>  		error = -EINVAL;
>  	}
> @@ -257,15 +185,3 @@
>  	return rv;
>  }
>  
> -int send_group_message(struct mountgroup *mg, int len, char *buf)
> -{
> -	int error;
> -
> -	error = group_send(gh, mg->name, len, buf);
> -	if (error < 0)
> -		log_error("group_send error %d errno %d", error, errno);
> -	else
> -		error = 0;
> -	return error;
> -}
> -
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/lock_dlm.h cluster/gfs/lock_dlm/daemon/lock_dlm.h
> --- cluster-HEAD/gfs/lock_dlm/daemon/lock_dlm.h	2006-05-25 14:30:40.000000000 -0500
> +++ cluster/gfs/lock_dlm/daemon/lock_dlm.h	2006-06-06 17:18:25.510916543 -0500
> @@ -201,11 +201,16 @@
>  	MSG_RECOVERY_DONE,
>  };
>  
> +#define GDLM_VER_MAJOR 1
> +#define GDLM_VER_MINOR 0
> +#define GDLM_VER_PATCH 0
> +
>  struct gdlm_header {
>  	uint16_t		version[3];
>  	uint16_t		type;			/* MSG_ */
>  	uint32_t		nodeid;			/* sender */
>  	uint32_t		to_nodeid;		/* 0 if to all */
> +	char			name[MAXNAME];
>  };
>  
> 
> @@ -214,6 +219,8 @@
>  
>  int setup_cman(void);
>  int process_cman(void);
> +int setup_cpg(void);
> +int process_cpg(void);
>  int setup_groupd(void);
>  int process_groupd(void);
>  int setup_libdlm(void);
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/main.c cluster/gfs/lock_dlm/daemon/main.c
> --- cluster-HEAD/gfs/lock_dlm/daemon/main.c	2006-04-21 14:54:10.000000000 -0500
> +++ cluster/gfs/lock_dlm/daemon/main.c	2006-06-07 11:59:12.248223925 -0500
> @@ -25,6 +25,7 @@
>  static struct pollfd pollfd[MAX_CLIENTS];
>  
>  static int cman_fd;
> +static int cpg_fd;
>  static int listen_fd;
>  static int groupd_fd;
>  static int uevent_fd;
> @@ -249,6 +250,11 @@
>  		goto out;
>  	client_add(cman_fd, &maxi);
>  
> +	rv = cpg_fd = setup_cpg();
> +	if (rv < 0)
> +		goto out;
> +	client_add(cpg_fd, &maxi);
> +
>  	rv = groupd_fd = setup_groupd();
>  	if (rv < 0)
>  		goto out;
> @@ -272,6 +278,8 @@
>  		goto out;
>  	client_add(plocks_fd, &maxi);
>  
> +	log_debug("setup done");
> +
>  	for (;;) {
>  		rv = poll(pollfd, maxi + 1, -1);
>  		if (rv < 0)
> @@ -296,6 +304,8 @@
>  					process_groupd();
>  				else if (pollfd[i].fd == cman_fd)
>  					process_cman();
> +				else if (pollfd[i].fd == cpg_fd)
> +					process_cpg();
>  				else if (pollfd[i].fd == uevent_fd)
>  					process_uevent();
>  				else if (!no_withdraw &&
> @@ -310,7 +320,6 @@
>  			if (pollfd[i].revents & POLLHUP) {
>  				if (pollfd[i].fd == cman_fd)
>  					exit_cman();
> -				log_debug("closing fd %d", pollfd[i].fd);
>  				close(pollfd[i].fd);
>  			}
>  		}
> 
> --
> Linux-cluster mailing list
> Linux-cluster redhat com
> https://www.redhat.com/mailman/listinfo/linux-cluster


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]