[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] cluster/rgmanager ChangeLog include/cman-priva ...



CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	lhh sourceware org	2006-08-07 22:05:02

Modified files:
	rgmanager      : ChangeLog 
	rgmanager/include: cman-private.h message.h 
	rgmanager/src/clulib: alloc.c cman.c message.c msg_cluster.c 
	                      msgtest.c vft.c 
	rgmanager/src/daemons: main.c rg_forward.c rg_state.c 
	rgmanager/src/utils: clustat.c 
Added files:
	rgmanager/src/clulib: ckpt_state.c 

Log message:
	* src/clulib/ckpt_state.c: Preliminary implementation of replacement
	for VF using AIS CKPT B.02.01 (w/ built-in test program)
	* include/cman-private.h: Clean up APIs (cman APIs return
	cman_handle_t, which is void*, should be using void ** all over)
	* include/message.h: Bump context count to 128, add destination
	node ID in header of packets.
	* src/clulib/alloc.c: If we alloc the same size, return the same
	block
	* src/clulib/cman.c: API cleanups
	* src/clulib/message.c: Add error checking to msg_print
	* src/clulib/msg_cluster.c: Check destination in header before
	processing message remove dup #define for MAX_CONTEXTS, add
	proto_error() macro for displaying protocol errors.  Use 'max'
	instead of 'fd' for select().  Use correct var when assigning
	contexts.  Fix CMAN handles.  Return correct size from msg_send()
	requests.
	* src/clulib/msgtest.c: Fix CMAN handles
	* src/clulib/vft.c: Don't handle VF_CURRENT inside comms thread
	* src/daemons/main.c: Check to see if nodes are listening on our
	port before we consider them running.  Handle VF_CURRENT requests
	from other nodes.  Fail if we can't determine local node ID
	* src/daemons/rg_forward.c: Give 10 minutes for responses to
	forwarded requests.
	* src/daemons/rg_state.c: Shorten RG state names.  Fix 'Uncertain'
	output line.
	* src/utils/clustat.c: Fix ccs_member_list() function.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/ChangeLog.diff?cvsroot=cluster&r1=1.17&r2=1.18
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/include/cman-private.h.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/include/message.h.diff?cvsroot=cluster&r1=1.2&r2=1.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/ckpt_state.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/alloc.c.diff?cvsroot=cluster&r1=1.8&r2=1.9
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/cman.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=1.2&r2=1.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_cluster.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgtest.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/vft.c.diff?cvsroot=cluster&r1=1.14&r2=1.15
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/main.c.diff?cvsroot=cluster&r1=1.26&r2=1.27
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_forward.c.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_state.c.diff?cvsroot=cluster&r1=1.18&r2=1.19
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clustat.c.diff?cvsroot=cluster&r1=1.18&r2=1.19

--- cluster/rgmanager/ChangeLog	2006/06/02 17:37:10	1.17
+++ cluster/rgmanager/ChangeLog	2006/08/07 22:05:01	1.18
@@ -1,3 +1,31 @@
+2006-08-07 Lon Hohberger <lhh at redhat.com>
+	* src/clulib/ckpt_state.c: Preliminary implementation of replacement
+	for VF using AIS CKPT B.02.01 (w/ built-in test program)
+	* include/cman-private.h: Clean up APIs (cman APIs return
+	cman_handle_t, which is void*, should be using void ** all over)
+	* include/message.h: Bump context count to 128, add destination
+	node ID in header of packets.
+	* src/clulib/alloc.c: If we alloc the same size, return the same
+	block
+	* src/clulib/cman.c: API cleanups
+	* src/clulib/message.c: Add error checking to msg_print
+	* src/clulib/msg_cluster.c: Check destination in header before 
+        processing message remove dup #define for MAX_CONTEXTS, add
+	proto_error() macro for displaying protocol errors.  Use 'max' 
+	instead of 'fd' for select().  Use correct var when assigning
+	contexts.  Fix CMAN handles.  Return correct size from msg_send()
+	requests.
+	* src/clulib/msgtest.c: Fix CMAN handles
+	* src/clulib/vft.c: Don't handle VF_CURRENT inside comms thread
+	* src/daemons/main.c: Check to see if nodes are listening on our
+	port before we consider them running.  Handle VF_CURRENT requests
+	from other nodes.  Fail if we can't determine local node ID
+	* src/daemons/rg_forward.c: Give 10 minutes for responses to
+	forwarded requests.
+	* src/daemons/rg_state.c: Shorten RG state names.  Fix 'Uncertain'
+	output line.
+	* src/utils/clustat.c: Fix ccs_member_list() function.
+
 2006-05-23 Lon Hohberger <lhh at redhat.com>
 	* src/daemons/members.c: Zap pad fields on copy-out
 	* src/daemons/main.c: Give notice if skipping an event because of
--- cluster/rgmanager/include/cman-private.h	2006/07/12 14:04:06	1.1
+++ cluster/rgmanager/include/cman-private.h	2006/08/07 22:05:01	1.2
@@ -3,11 +3,11 @@
 
 #include <libcman.h>
 
-int cman_init_subsys(cman_handle_t *ch);
-cman_handle_t *cman_lock(int block, int sig);
-cman_handle_t *cman_lock_preemptible(int block, int *fd);
+int cman_init_subsys(cman_handle_t ch);
+cman_handle_t cman_lock(int block, int sig);
+cman_handle_t cman_lock_preemptible(int block, int *fd);
 int cman_cleanup_subsys(void);
-int cman_unlock(cman_handle_t *ch);
+int cman_unlock(cman_handle_t ch);
 int cman_send_data_unlocked(void *buf, int len, int flags,
 			    uint8_t port, int nodeid);
 
--- cluster/rgmanager/include/message.h	2006/07/12 14:04:06	1.2
+++ cluster/rgmanager/include/message.h	2006/08/07 22:05:01	1.3
@@ -30,14 +30,17 @@
 
 /* Header is never presented to applications */
 typedef struct {
-	uint32_t	dest_ctx;
 	uint32_t	src_ctx;
-	/* 8 */
 	uint32_t	src_nodeid;
+	/* 8 */
+	uint32_t	dest_ctx;
+	uint32_t	dest_nodeid;
+	/* 16 */
 	uint8_t		msg_control;
 	uint8_t		msg_port;
 	uint8_t		pad[2];
-	/* 16 */
+	/* 20 */
+	uint8_t		msg_reserved[12];
 } cluster_msg_hdr_t;
 
 /* Header is never presented to applications */
@@ -62,6 +65,7 @@
 	swab32((ptr)->dest_ctx);\
 	swab32((ptr)->src_ctx);\
 	swab32((ptr)->src_nodeid);\
+	swab32((ptr)->dest_nodeid);\
 }
 
 
@@ -123,7 +127,7 @@
 
 
 /* Ripped from ccsd's setup_local_socket */
-#define MAX_CONTEXTS 32  /* Testing; production should be 1024-ish */
+#define MAX_CONTEXTS 128  /* Testing; production should be 1024-ish */
 
 #define SKF_LISTEN (1<<0)
 #define SKF_READ   (1<<1)
/cvs/cluster/cluster/rgmanager/src/clulib/ckpt_state.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/ckpt_state.c
+++ -	2006-08-07 22:05:02.499001000 +0000
@@ -0,0 +1,550 @@
+/*
+  Copyright Red Hat, Inc. 2002-2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge,
+  MA 02139, USA.
+*/
+//#define DEBUG
+/** @file
+ * Distributed states using saCkpt interface
+ */
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/resource.h>
+#include <sys/wait.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include <saAis.h>
+#include <saCkpt.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <assert.h>
+#include <ds.h>
+
+typedef struct _key_node {
+	struct _key_node *kn_next;
+	char *kn_keyid;
+	SaTimeT kn_timeout;
+	uint16_t kn_ready; 
+	SaNameT kn_cpname;
+	SaCkptCheckpointHandleT kn_cph;
+} key_node_t;
+
+
+static key_node_t *key_list = NULL;
+static SaCkptHandleT ds_ckpt;
+static int ds_ready = 0;
+static pthread_mutex_t ds_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+int ais_to_posix(SaAisErrorT err);
+
+
+static key_node_t *
+kn_find_key(char *keyid)
+{
+	key_node_t *cur;
+
+	for (cur = key_list; cur; cur = cur->kn_next)
+		if (!strcmp(cur->kn_keyid,keyid))
+			return cur;
+
+	return NULL;
+}
+
+
+/**
+ * Adds a key to key node list and sets up callback functions.
+ */
+static SaAisErrorT
+ds_key_init_nt(char *keyid, int maxsize, int timeout)
+{
+	SaCkptCheckpointCreationAttributesT attrs;
+	SaCkptCheckpointOpenFlagsT flags;
+	SaCkptCheckpointDescriptorT status;
+	SaAisErrorT err;
+	key_node_t *newnode = NULL;
+	
+	newnode = kn_find_key(keyid);
+	if (newnode) {
+		printf("Key %s already initialized\n", keyid);
+		return SA_AIS_OK;
+	}
+
+	newnode = malloc(sizeof(*newnode));
+	memset(newnode,0,sizeof(*newnode));
+	snprintf((char *)newnode->kn_cpname.value, SA_MAX_NAME_LENGTH-1,
+		 "%s", keyid);
+	newnode->kn_cpname.length = strlen(keyid);
+	newnode->kn_keyid = (char *)newnode->kn_cpname.value;
+	newnode->kn_ready = 0;
+
+	if (timeout < 5) {
+		/* Join View message timeout must exceed the
+		   coordinator timeout */
+		timeout = 5;
+	}
+	newnode->kn_timeout = timeout * SA_TIME_ONE_SECOND;
+
+	flags = SA_CKPT_CHECKPOINT_READ |
+		SA_CKPT_CHECKPOINT_WRITE;
+
+	err = saCkptCheckpointOpen(ds_ckpt,
+				   &newnode->kn_cpname,
+				   NULL,	
+				   flags,
+				   newnode->kn_timeout,
+				   &newnode->kn_cph);
+
+	if (err == SA_AIS_OK) {
+		saCkptCheckpointStatusGet(newnode->kn_cph,
+					  &status);
+
+		printf("Checkpoint Size = %d bytes\n", (int)
+			status.checkpointCreationAttributes.checkpointSize);
+		printf("Flags = ");
+		if (status.checkpointCreationAttributes.creationFlags &
+			SA_CKPT_WR_ALL_REPLICAS) {
+			printf("%s ", "SA_CKPT_WR_ALL_REPLICAS");
+		}
+		if (status.checkpointCreationAttributes.creationFlags &
+			SA_CKPT_WR_ACTIVE_REPLICA) {
+			printf("%s ", "SA_CKPT_WR_ACTIVE_REPLICA");
+		}
+		if (status.checkpointCreationAttributes.creationFlags &
+			SA_CKPT_WR_ACTIVE_REPLICA_WEAK) {
+			printf("%s ", "SA_CKPT_WR_ACTIVE_REPLICA_WEAK");
+		}
+		if (status.checkpointCreationAttributes.creationFlags &
+			SA_CKPT_CHECKPOINT_COLLOCATED) {
+			printf("%s ", "SA_CKPT_CHECKPOINT_COLLOCATED");
+		}
+		printf("\nMax sections = %d\n",
+			(int)status.checkpointCreationAttributes.maxSections);
+		printf("Max section size = %d\n",
+			(int)status.checkpointCreationAttributes.maxSectionSize);
+		printf("Max section ID size = %d\n",
+			(int)status.checkpointCreationAttributes.maxSectionIdSize);
+		printf("Section count = %d\n", status.numberOfSections);
+		printf("\n");
+		
+		goto good;
+	}
+
+	printf("Retrying w/ create\n");
+
+	attrs.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
+	attrs.checkpointSize = (SaSizeT)maxsize;
+	attrs.retentionDuration = SA_TIME_ONE_HOUR;
+	attrs.maxSections = 1;
+	attrs.maxSectionSize = (SaSizeT)maxsize;
+	attrs.maxSectionIdSize = (SaSizeT)32;
+
+	flags = SA_CKPT_CHECKPOINT_READ |
+		SA_CKPT_CHECKPOINT_WRITE |
+		SA_CKPT_CHECKPOINT_CREATE;
+
+	err = saCkptCheckpointOpen(ds_ckpt,
+				   &newnode->kn_cpname,
+				   &attrs,
+				   flags,
+				   newnode->kn_timeout,
+				   &newnode->kn_cph);
+	if (err == SA_AIS_OK)
+		goto good;
+
+	/* No checkpoint */
+	free(newnode);
+	return err;
+good:
+
+	newnode->kn_ready = 1;
+	newnode->kn_next = key_list;
+	key_list = newnode;
+	printf("Opened ckpt %s\n", keyid);
+
+	return err;
+}
+
+
+int
+ds_key_init(char *keyid, int maxsize, int timeout)
+{
+	SaAisErrorT err;
+
+	pthread_mutex_lock(&ds_mutex);
+	err = ds_key_init_nt(keyid, maxsize, timeout);
+	pthread_mutex_unlock(&ds_mutex);
+
+	errno = ais_to_posix(err);
+	if (errno)
+		return -1;
+	return 0;
+}
+
+
+static SaAisErrorT
+ds_key_cleanup(key_node_t *node)
+{
+	if (!node || !node->kn_ready) {
+		printf("Key %s already freed\n", node->kn_keyid);
+		return SA_AIS_OK;
+	}
+
+	return saCkptCheckpointClose(node->kn_cph);
+}
+
+
+
+static SaAisErrorT
+ds_key_finish_nt(char *keyid)
+{
+	key_node_t *node;
+
+	node = kn_find_key(keyid);
+	/* TODO: Free list entry */
+
+	return ds_key_cleanup(node);
+}
+
+
+int
+ds_key_finish(char *keyid)
+{
+	SaAisErrorT err;
+
+	pthread_mutex_lock(&ds_mutex);
+	err = ds_key_finish_nt(keyid);
+	pthread_mutex_unlock(&ds_mutex);
+
+	errno = ais_to_posix(err);
+	if (errno)
+		return -1;
+	return 0;
+}
+
+
+
+static void
+open_callback(SaInvocationT invocation,
+	      SaCkptCheckpointHandleT handle,
+	      SaAisErrorT error)
+{
+	/* Do Open callback here.  Since we use sync calls instead
+	   of async calls, this is never used. */
+}
+
+
+static void
+sync_callback(SaInvocationT invocation,
+	      SaAisErrorT error)
+{
+	/* Do Sync callback here.  Since we use sync calls instead
+	   of async calls, this is never used. */
+}
+
+
+int
+ais_to_posix(SaAisErrorT err)
+{
+	switch (err) {
+	case SA_AIS_OK:
+		return 0;
+	case SA_AIS_ERR_LIBRARY:
+		return ELIBBAD;
+	case SA_AIS_ERR_VERSION:
+		return EPROTONOSUPPORT; //XXX
+	case SA_AIS_ERR_INIT:
+		return EFAULT; //XXX
+	case SA_AIS_ERR_TIMEOUT:
+		return ETIMEDOUT;
+	case SA_AIS_ERR_TRY_AGAIN:
+		return EAGAIN;
+	case SA_AIS_ERR_INVALID_PARAM:
+		return EINVAL;
+	case SA_AIS_ERR_NO_MEMORY:
+		return ENOMEM;
+	case SA_AIS_ERR_BAD_HANDLE:
+		return EBADF;
+	case SA_AIS_ERR_BUSY:
+		return EBUSY;
+	case SA_AIS_ERR_ACCESS:
+		return EACCES;
+	case SA_AIS_ERR_NOT_EXIST:
+		return ENOENT;
+	case SA_AIS_ERR_NAME_TOO_LONG:
+		return ENAMETOOLONG;
+	case SA_AIS_ERR_EXIST:
+		return EEXIST;
+	case SA_AIS_ERR_NO_SPACE:
+		return ENOSPC;
+	case SA_AIS_ERR_INTERRUPT:
+		return EINTR;
+	case SA_AIS_ERR_NAME_NOT_FOUND:
+		return ENOENT;
+	case SA_AIS_ERR_NO_RESOURCES:
+		return ENOMEM; //XXX
+	case SA_AIS_ERR_NOT_SUPPORTED:
+		return ENOSYS;
+	case SA_AIS_ERR_BAD_OPERATION:
+		return EINVAL; //XXX
+	case SA_AIS_ERR_FAILED_OPERATION:
+		return EIO; //XXX
+	case SA_AIS_ERR_MESSAGE_ERROR:
+		return EIO; // XXX
+	case SA_AIS_ERR_QUEUE_FULL:
+		return ENOBUFS;
+	case SA_AIS_ERR_QUEUE_NOT_AVAILABLE:
+		return ENOENT;
+	case SA_AIS_ERR_BAD_FLAGS:
+		return EINVAL;
+	case SA_AIS_ERR_TOO_BIG:
+		return E2BIG;
+	case SA_AIS_ERR_NO_SECTIONS:
+		return ENOENT; // XXX
+	}
+
+	return -1;
+}
+
+
+int
+ds_init(void)
+{
+	int ret = 0;
+	SaAisErrorT err;
+	SaVersionT ver;
+	SaCkptCallbacksT callbacks;
+
+	pthread_mutex_lock(&ds_mutex);
+	if (ds_ready) {
+		pthread_mutex_unlock(&ds_mutex);
+		return 0;
+	}
+
+	ver.releaseCode = 'B';
+	ver.majorVersion = 1;
+	ver.minorVersion = 1;
+
+	callbacks.saCkptCheckpointOpenCallback = open_callback;
+	callbacks.saCkptCheckpointSynchronizeCallback = sync_callback;
+
+	err = saCkptInitialize(&ds_ckpt, &callbacks, &ver);
+
+	if (err != SA_AIS_OK)
+		ret = -1;
+	else
+		ds_ready= 1;
+
+	pthread_mutex_unlock(&ds_mutex);
+
+	if (ret != 0)
+		errno = ais_to_posix(err);
+	return ret;
+}
+
+
+int
+ds_write(char *keyid, void *buf, size_t maxlen)
+{
+	key_node_t *node;
+	SaCkptIOVectorElementT iov = {SA_CKPT_DEFAULT_SECTION_ID,
+				      NULL, 0, 0, 0};
+	SaAisErrorT err;
+
+	//printf("writing to ckpt %s\n", keyid);
+
+	pthread_mutex_lock(&ds_mutex);
+
+	while ((node = kn_find_key(keyid)) == NULL) {
+
+		err = ds_key_init_nt(keyid,
+				(maxlen>DS_MIN_SIZE?maxlen:DS_MIN_SIZE), 5);
+		if (err != SA_AIS_OK)
+			goto out;
+	}
+
+	iov.dataBuffer = buf;
+	iov.dataSize = (SaSizeT)maxlen;
+	iov.dataOffset = 0;
+	iov.readSize = 0;
+
+	err = saCkptCheckpointWrite(node->kn_cph, &iov, 1, NULL);
+
+	if (err == SA_AIS_OK)
+		saCkptCheckpointSynchronize(node->kn_cph, node->kn_timeout);
+
+out:
+	pthread_mutex_unlock(&ds_mutex);
+	
+	errno = ais_to_posix(err);
+	if (errno)
+		return -1;
+	return maxlen; /* XXX */
+}
+
+
+int
+ds_read(char *keyid, void *buf, size_t maxlen)
+{
+	key_node_t *node;
+	SaCkptIOVectorElementT iov = {SA_CKPT_DEFAULT_SECTION_ID,
+				      NULL, 0, 0, 0};
+	SaAisErrorT err;
+
+	//printf("reading ckpt %s\n", keyid);
+
+	pthread_mutex_lock(&ds_mutex);
+
+	node = kn_find_key(keyid);
+	if (!node) {
+		pthread_mutex_unlock(&ds_mutex);
+		errno = ENOENT;
+		return -1;
+	}
+
+	iov.dataBuffer = buf;
+	iov.dataSize = (SaSizeT)maxlen;
+	iov.dataOffset = 0;
+	iov.readSize = 0;
+
+	err = saCkptCheckpointRead(node->kn_cph, &iov, 1, NULL);
+
+	pthread_mutex_unlock(&ds_mutex);
+	
+	errno = ais_to_posix(err);
+	if (errno)
+		return -1;
+	return iov.readSize; /* XXX */
+}
+
+
+int
+ds_finish(void)
+{
+	int ret = 0;
+	SaAisErrorT err;
+	key_node_t *node;
+
+	pthread_mutex_lock(&ds_mutex);
+	if (!ds_ready) {
+		pthread_mutex_unlock(&ds_mutex);
+		return 0;
+	}
+
+	/* Zap all the checkpoints */
+	for (node = key_list; node; node = node->kn_next) {
+		ds_key_cleanup(node);
+	}
+
+	err = saCkptFinalize(ds_ckpt);
+
+	if (err != SA_AIS_OK)
+		ret = -1;
+	else
+		ds_ready = 0;
+
+	pthread_mutex_unlock(&ds_mutex);
+
+	if (ret != 0)
+		errno = ais_to_posix(err);
+	return ret;
+}
+
+
+#ifdef STANDALONE
+void
+usage(int ret)
+{
+	printf("usage: ckpt <-r key|-w key -d data>\n");
+	exit(ret);
+}
+
+int
+main(int argc, char **argv)
+{
+	char *keyid = "testing";
+	char *val;
+	char buf[DS_MIN_SIZE];
+	int ret;
+	int op = 0;
+
+	while((ret = getopt(argc, argv, "w:r:d:j?")) != EOF) {
+		switch(ret) {
+		case 'w': 
+			op = 'w';
+			keyid = optarg;
+			break;
+		case 'r':
+			op = 'r';
+			keyid = optarg;
+			break;
+		case 'd':
+			val = optarg;
+			break;
+		case '?':
+		case 'h':
+			usage(0);
+		default:
+			usage(1);
+		}
+	}
+
+	if (!op) {
+		usage(1);
+	}
+
+	if (!keyid) {
+		usage(1);
+	}
+
+	if (ds_init() < 0) {
+		perror("ds_init");
+		return 1;
+	}
+
+	if (ds_key_init(keyid, DS_MIN_SIZE, 5) < 0) {
+		perror("ds_key_init");
+		return 1;
+	}
+
+	if (op == 'w') {
+		if (ds_write(keyid, val, strlen(val)+1) < 0) {
+			perror("ds_write");
+			return 1;
+		}
+	} else if (op == 'r') {
+		ret = ds_read(keyid, buf, sizeof(buf));
+		if (ret < 0) {
+			perror("ds_write");
+			return 1;
+		}
+
+		printf("%d bytes\nDATA for '%s':\n%s\n", ret, keyid,
+		       buf);
+	}
+
+	ds_key_finish(keyid);
+
+	if (ds_finish() < 0) {
+		perror("ds_finish");
+		return 0;
+	}
+
+	return 0;
+}
+#endif
--- cluster/rgmanager/src/clulib/alloc.c	2006/07/11 23:52:41	1.8
+++ cluster/rgmanager/src/clulib/alloc.c	2006/08/07 22:05:01	1.9
@@ -885,6 +885,12 @@
 #endif
 	void *newp;
 
+	if (oldp) {
+		oldb = block(oldp);
+		if (newsize <= oldb->mb_size)
+			return oldp;
+	}
+
 	newp = malloc(newsize);
 
 	if (!newp) {
--- cluster/rgmanager/src/clulib/cman.c	2006/07/11 23:52:41	1.1
+++ cluster/rgmanager/src/clulib/cman.c	2006/08/07 22:05:01	1.2
@@ -32,7 +32,7 @@
 #include <sys/socket.h>
 #include <fcntl.h>
 
-static cman_handle_t *_chandle = NULL;
+static cman_handle_t _chandle = NULL;
 static pthread_mutex_t _chandle_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t _chandle_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t _chandle_holder = 0;
@@ -60,7 +60,7 @@
   @return		NULL / errno on failure; the global CMAN handle
 			on success.
  */
-cman_handle_t *
+cman_handle_t 
 cman_lock(int block, int preempt)
 {
 	pthread_t tid;
@@ -110,7 +110,7 @@
   @return		NULL / errno on failure; the global CMAN handle
 			on success.
  */
-cman_handle_t *
+cman_handle_t
 cman_lock_preemptible(int block, int *preempt_fd)
 {
 	pthread_t tid;
@@ -160,7 +160,7 @@
   @return		-1 on failure, 0 on success
  */
 int
-cman_unlock(cman_handle_t *ch)
+cman_unlock(cman_handle_t ch)
 {
 	int ret = -1;
 	char c;
--- cluster/rgmanager/src/clulib/message.c	2006/07/11 23:52:41	1.2
+++ cluster/rgmanager/src/clulib/message.c	2006/08/07 22:05:01	1.3
@@ -251,8 +251,17 @@
 void
 msg_print(msgctx_t *ctx)
 {
+	if (!ctx) {
+		printf("Attempt to call %s on NULL\n", __FUNCTION__);
+		return;
+	}
+
 	if (ctx->ops && ctx->ops->mo_print)
 		return ctx->ops->mo_print(ctx);
+
+	printf("Warning: Attempt to call %s on uninitialized context %p\n",
+	       __FUNCTION__, ctx);
+	printf("  ctx->type = %d\n", ctx->type);
 }
 
 
--- cluster/rgmanager/src/clulib/msg_cluster.c	2006/07/11 23:52:41	1.1
+++ cluster/rgmanager/src/clulib/msg_cluster.c	2006/08/07 22:05:01	1.2
@@ -31,10 +31,10 @@
 #include <errno.h>
 #include <signal.h>
 #include <signals.h>
+#include <gettid.h>
 #include <cman-private.h>
 
 /* Ripped from ccsd's setup_local_socket */
-#define MAX_CONTEXTS 32  /* Testing; production should be 1024-ish */
 
 int cluster_msg_close(msgctx_t *ctx);
 
@@ -55,6 +55,24 @@
 	  (ctx->u.local_info.sockfd != -1)))
 
 static msg_ops_t cluster_msg_ops;
+static void cluster_msg_print(msgctx_t *ctx);
+
+
+#define proto_error(ctx, msg, str) \
+do { \
+	printf("<<< CUT HERE >>>\n"); \
+	printf("[%d] PROTOCOL ERROR in %s: %s\n", gettid(), __FUNCTION__, str); \
+	msg_print(ctx); \
+	if (msg) { \
+	printf("  msg->msg_control = %d\n", ((cluster_msg_hdr_t *)msg)->msg_control); \
+	printf("  msg->src_ctx = %d\n", ((cluster_msg_hdr_t *)msg)->src_ctx); \
+	printf("  msg->dest_ctx = %d\n",  ((cluster_msg_hdr_t *)msg)->dest_ctx); \
+	printf("  msg->src_nodeid = %d\n",  ((cluster_msg_hdr_t *)msg)->src_nodeid); \
+	printf("  msg->msg_port = %d\n",  ((cluster_msg_hdr_t *)msg)->msg_port); \
+	} \
+	printf(">>> CUT HERE <<<\n"); \
+} while(0)
+
 
 
 static int
@@ -76,6 +94,7 @@
 	}
 
 	h->msg_control = M_DATA;
+	h->dest_nodeid = ctx->u.cluster_info.nodeid;
 	h->src_ctx = ctx->u.cluster_info.local_ctx;
 	h->dest_ctx = ctx->u.cluster_info.remote_ctx;
 	h->msg_port = ctx->u.cluster_info.port;
@@ -84,7 +103,8 @@
 
 	/*
 	printf("sending cluster message, length = %d to nodeid %d port %d\n",
-	       len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
+	       len + sizeof(*h), ctx->u.cluster_info.nodeid,
+	       ctx->u.cluster_info.port);
 	 */
 
 	swab_cluster_msg_hdr_t(h);
@@ -93,7 +113,14 @@
 			       ctx->u.cluster_info.nodeid,
 			       ctx->u.cluster_info.port, 0);
 
-	return len + sizeof(h);
+	if (ret < 0)
+		return ret;
+
+	if (ret >= (len + sizeof(*h)))
+		return len;
+
+	errno = EAGAIN;
+	return -1;
 }
 
 
@@ -116,8 +143,8 @@
 			context_index = 1;
 
 		if (!contexts[context_index]) {
-			contexts[start] = ctx;
-			ctx->u.cluster_info.local_ctx = start;
+			contexts[context_index] = ctx;
+			ctx->u.cluster_info.local_ctx = context_index;
 			pthread_mutex_unlock(&context_lock);
 			return 0;
 		}
@@ -141,7 +168,7 @@
 	int fd, lfd, max;
 	struct timeval tv;
 	struct timeval *p = NULL;
-	cman_handle_t *ch;
+	cman_handle_t ch;
 
 	if (timeout >= 0) {
 		p = &tv;
@@ -171,7 +198,7 @@
 	FD_SET(lfd, &rfds);
 
 	max = (lfd > fd ? lfd : fd);
-	if (select(fd + 1, &rfds, NULL, NULL, p) > 0) {
+	if (select(max + 1, &rfds, NULL, NULL, p) > 0) {
 		/* Someone woke us up */
 		if (FD_ISSET(lfd, &rfds)) {
 			cman_unlock(ch);
@@ -199,6 +226,7 @@
 	int ret;
 
 	cm.msg_control = (uint8_t)type;
+	cm.dest_nodeid = ctx->u.cluster_info.nodeid;
 	cm.src_nodeid = _me;
 	cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
 	cm.src_ctx = ctx->u.cluster_info.local_ctx;
@@ -515,7 +543,15 @@
 		free(n);
 		return 0;
 	default:
-		printf("PROTOCOL ERROR: Received %d\n", req);
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+		n = ctx->u.cluster_info.queue;
+		list_remove(&ctx->u.cluster_info.queue, n);
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+		proto_error(ctx, n->message, "Illegal request on established pchannel");
+		if (n->message)
+			free(n->message);
+		free(n);
 		return -1;
 	}
 
@@ -570,6 +606,7 @@
 	/* Send open */
 	//printf("  Sending control message M_OPEN\n");
 	if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+		printf("Error sending control message\n");
 		return -1;
 	}
 
@@ -590,8 +627,7 @@
 		case M_NONE:
 			continue;
 		default: 
-			printf("PROTO ERROR: M_OPEN_ACK not received: %d %d\n",
-				ret, errno);
+			proto_error(ctx, NULL, "M_OPEN_ACK not received\n");
 		}
 	}
 
@@ -630,6 +666,7 @@
 		return -1;
 	}
 
+
 	pthread_mutex_lock(&context_lock);
 	/* Other threads should not be able to see this again */
 	if (contexts[ctx->u.cluster_info.local_ctx] &&
@@ -663,6 +700,8 @@
 	}
 	ctx->type = MSG_NONE;
 	ctx->ops = NULL;
+
+
 	return 0;
 }
 
@@ -672,6 +711,11 @@
 {
 	msg_q_t *node;
 
+	if (ctx->type == MSG_NONE) {
+		printf("Queue_for_context called w/o valid context\n");
+		raise(SIGSEGV);
+	}
+
 	while ((node = malloc(sizeof(*node))) == NULL) {
 		sleep(1);
 	}
@@ -744,6 +788,14 @@
 		return;
 	}
 
+	if (m->dest_nodeid != 0 && m->dest_nodeid != _me) {
+#ifdef DEBUG
+		printf("Skipping message meant for node %d (I am %d)\n",
+		       m->dest_nodeid, _me);
+#endif
+		return;
+	}
+
 	pthread_mutex_lock(&context_lock);
 
 	if (m->dest_ctx == 0 && m->msg_control == M_DATA) {
@@ -762,7 +814,19 @@
 			queue_for_context(contexts[x], buf, len);
 		}
 	} else if (contexts[m->dest_ctx]) {
-		/* Normal receive */
+
+#if 0
+		if (m->msg_control == M_OPEN_ACK) {
+			for (x = 0; x < MAX_CONTEXTS; x++) {
+				if (contexts[x] &&
+				    contexts[x]->dest_ctx == m->src_ctx) {
+					proto_error(contexts[x], m,
+						"Duplicate M_OPEN_ACK");
+				}
+			}
+		}
+#endif
+		
 		queue_for_context(contexts[m->dest_ctx], buf, len);
 	}
 	/* If none of the above, then we msg for something we've already
@@ -964,9 +1028,9 @@
 {
 	int e;
 	pthread_attr_t attrs;
-	cman_handle_t *ch = NULL;
+	cman_handle_t ch = NULL;
 	msgctx_t *ctx;
-	int port;
+	uint8_t port;
 
 	errno = EINVAL;
 	if (!portp)
@@ -988,12 +1052,12 @@
 
 	memset(contexts, 0, sizeof(contexts));
 
-	*cluster_ctx = ctx;
-	if (cman_start_recv_data(ch, process_cman_msg,
-				 port) != 0) {
+	if (cman_start_recv_data(ch, process_cman_msg, port) != 0) {
 		e = errno;
 		cman_unlock(ch);
-		msg_free_ctx((msgctx_t *)*cluster_ctx);
+		msg_free_ctx((msgctx_t *)ctx);
+
+		printf("Doom\n");
 		errno = e;
 		return -1;
 	}
@@ -1001,8 +1065,9 @@
 	if (cman_start_notification(ch, process_cman_event) != 0) {
 		e = errno;
 		cman_unlock(ch);
-		msg_free_ctx((msgctx_t *)*cluster_ctx);
+		msg_free_ctx((msgctx_t *)ctx);
 		errno = e;
+		return -1;
 	}
 
 	cman_unlock(ch);
@@ -1027,6 +1092,8 @@
 	ctx->flags = SKF_LISTEN | SKF_READ | SKF_WRITE | SKF_MCAST;
 	pthread_mutex_unlock(&context_lock);
 
+	*cluster_ctx = ctx;
+
        	pthread_attr_init(&attrs);
        	pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
        	pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
@@ -1057,7 +1124,7 @@
 int
 cluster_msg_shutdown(void)
 {
-	cman_handle_t *ch;
+	cman_handle_t ch;
 
 	ch = cman_lock(1, SIGUSR2);
 	cman_end_recv_data(ch);
--- cluster/rgmanager/src/clulib/msgtest.c	2006/07/11 23:52:41	1.1
+++ cluster/rgmanager/src/clulib/msgtest.c	2006/08/07 22:05:01	1.2
@@ -28,7 +28,7 @@
 #include <signal.h>
 #include <cman-private.h>
 
-#define MYPORT 190
+#define MYPORT 67
 
 int my_node_id = 0;
 int running = 1;
@@ -121,7 +121,7 @@
 
 
 void
-clu_initialize(cman_handle_t **ch)
+clu_initialize(cman_handle_t *ch)
 {
 	if (!ch)
 		exit(1);
@@ -198,23 +198,34 @@
 	pthread_t piggy, priv;
 	fd_set rfds;
 	int max = 0;
-	int port = MYPORT;
-	cman_handle_t *clu = NULL;
+	uint8_t port = MYPORT;
+	cman_handle_t clu = NULL;
 
 
 	clu_initialize(&clu);
 
+	if (clu == NULL) {
+		printf("Failed to connect to CMAN\n");
+	}
+
 	if (cman_init_subsys(clu) < 0) {
 		perror("cman_init_subsys");
 		return -1;
 	}
-        cman_get_node(clu, CMAN_NODEID_US, &me);
+
+	memset(&me, 0, sizeof(me));
+
+        if (cman_get_node(clu, CMAN_NODEID_US, &me) < 0) {
+		perror("cman_get_node");
+		return -1;
+	}
 
 	my_node_id = me.cn_nodeid;
+	printf("I am node ID %d\n", my_node_id);
 
-	if (msg_listen(MSG_CLUSTER, (void *)&port,
-	    me.cn_nodeid, &cluster_ctx) < 0) {
-		printf("Couldn't set up cluster message system\n");
+	if (msg_listen(MSG_CLUSTER, (void *)&port, me.cn_nodeid, &cluster_ctx) < 0) {
+		printf("Couldn't set up cluster message system: %s\n",
+			strerror(errno));
 		return -1;
 	}
 
--- cluster/rgmanager/src/clulib/vft.c	2006/07/11 23:52:41	1.14
+++ cluster/rgmanager/src/clulib/vft.c	2006/08/07 22:05:01	1.15
@@ -819,7 +819,8 @@
 		}
 
 		swab_generic_msg_hdr(hdrp);
-		if (hdrp->gh_command == VF_MESSAGE) {
+		if (hdrp->gh_command == VF_MESSAGE &&
+		    hdrp->gh_arg1 != VF_CURRENT) {
 			if (vf_process_msg(ctx, 0, hdrp, n) == VFR_COMMIT) {
 #ifdef DEBUG
 				printf("VFT: View committed\n");
--- cluster/rgmanager/src/daemons/main.c	2006/07/11 23:52:41	1.26
+++ cluster/rgmanager/src/daemons/main.c	2006/08/07 22:05:01	1.27
@@ -58,6 +58,7 @@
 int shutdown_pending = 0, running = 1, need_reconfigure = 0;
 char debug = 0; /* XXX* */
 static int signalled = 0;
+static int port = RG_PORT;
 
 uint64_t next_node_id(cluster_member_list_t *membership, uint64_t me);
 int rg_event_q(char *svcName, uint32_t state, int owner);
@@ -135,6 +136,38 @@
 
 	old_membership = member_list();
 	new_ml = get_member_list(h);
+
+	for (x = 0; x < new_ml->cml_count; x++) {
+
+		if (new_ml->cml_members[x].cn_member == 0)
+			continue;
+		if (new_ml->cml_members[x].cn_nodeid == my_id())
+			continue;
+
+		do {
+			quorate = cman_is_listening(h,
+					new_ml->cml_members[x].cn_nodeid,
+					port);
+			if (quorate == 0) {
+				clulog(LOG_DEBUG, "Node %d is not listening\n",
+					new_ml->cml_members[x].cn_nodeid);
+				new_ml->cml_members[x].cn_member = 0;
+			} else if (quorate == -1 && errno == EBUSY) {
+				usleep(50000);
+				continue;
+			}
+
+			if (quorate < 0) {
+				perror("cman_is_listening");
+			}
+
+			if (quorate > 0) {
+				printf("Node %d IS listenin\n", new_ml->cml_members[x].cn_nodeid);
+			}
+
+		} while(0);
+	}
+
 	member_list_update(new_ml);
 	cman_finish(h);
 
@@ -295,11 +328,21 @@
 {
 	int ret;
 	char state;
+#ifdef OPENAIS
+	msgctx_t everyone;
+#else
 	cluster_member_list_t *m = member_list();
+#endif
 
 	state = (req==RG_LOCK)?1:0;
+
+#ifdef OPENAIS
+	ret = ds_write("rg_lockdown", &state, 1);
+	clulog(LOG_INFO, "FIXME: send RG_LOCK update to all!\n");
+#else
 	ret = vf_write(m, VFF_IGN_CONN_ERRORS, "rg_lockdown", &state, 1);
 	free_member_list(m);
+#endif
 
 	if (ret == 0) {
 		msg_send_simple(ctx, RG_SUCCESS, 0, 0);
@@ -331,7 +374,7 @@
 	memset(msgbuf, 0, sizeof(msgbuf));
 
 	/* Peek-a-boo */
-	sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10);
+	sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 1);
 	if (sz < sizeof (generic_msg_hdr)) {
 		clulog(LOG_ERR, "#37: Error receiving message header (%d)\n", sz);
 		goto out;
@@ -447,7 +490,10 @@
 		break;
 
 	case VF_MESSAGE:
-		/* Ignore; our VF thread handles these */
+		/* Ignore; our VF thread handles these
+		    - except for VF_CURRENT XXX (bad design) */
+		if (msg_hdr->gh_arg1 == VF_CURRENT)
+			vf_process_msg(ctx, 0, msg_hdr, sz);
 		break;
 
 	default:
@@ -478,7 +524,6 @@
 	
 	ret = msg_wait(ctx, 0);
 
-
 	switch(ret) {
 	case M_PORTOPENED:
 		msg_receive(ctx, NULL, 0, 0);
@@ -706,7 +751,7 @@
 
 
 void
-clu_initialize(cman_handle_t **ch)
+clu_initialize(cman_handle_t *ch)
 {
 	if (!ch)
 		exit(1);
@@ -765,10 +810,8 @@
 	cman_node_t me;
 	msgctx_t *cluster_ctx;
 	msgctx_t *local_ctx;
-	int port = RG_PORT;
 	pthread_t th;
-
-	cman_handle_t *clu = NULL;
+	cman_handle_t clu = NULL;
 
 	while ((rv = getopt(argc, argv, "fd")) != EOF) {
 		switch (rv) {
@@ -809,8 +852,16 @@
 
 	memset(&me, 0, sizeof(me));
         cman_get_node(clu, CMAN_NODEID_US, &me);
+
+	if (me.cn_nodeid == 0) {
+		printf("Unable to determine local node ID\n");
+		perror("cman_get_node");
+		return -1;
+	}
 	set_my_id(me.cn_nodeid);
 
+	clulog(LOG_INFO, "I am node #%d\n", my_id());
+
 	/*
 	   We know we're quorate.  At this point, we need to
 	   read the resource group trees from ccsd.
@@ -842,7 +893,7 @@
 		return -1;
 	}
 
-	if (msg_listen(MSG_CLUSTER, &port , me.cn_nodeid, &cluster_ctx) < 0) {
+	if (msg_listen(MSG_CLUSTER, &port, me.cn_nodeid, &cluster_ctx) < 0) {
 		clulog(LOG_CRIT,
 		       "#10b: Couldn't set up cluster message system: %s\n",
 		       strerror(errno));
@@ -859,12 +910,21 @@
 	/*
 	   Initialize the VF stuff.
 	 */
-	if (vf_init(me.cn_nodeid, RG_PORT, NULL, NULL) != 0) {
+#ifdef OPENAIS
+	if (ds_init() < 0) {
+		clulog(LOG_CRIT, "#11b: Couldn't initialize SAI AIS CKPT\n");
+		return -1;
+	}
+
+	ds_key_init("rg_lockdown", 32, 10);
+#else
+	if (vf_init(me.cn_nodeid, port, NULL, NULL) != 0) {
 		clulog(LOG_CRIT, "#11: Couldn't set up VF listen socket\n");
 		return -1;
 	}
 
 	vf_key_init("rg_lockdown", 10, NULL, lock_commit_cb);
+#endif
 
 	/*
 	   Do everything useful
--- cluster/rgmanager/src/daemons/rg_forward.c	2006/07/11 23:52:41	1.5
+++ cluster/rgmanager/src/daemons/rg_forward.c	2006/08/07 22:05:01	1.6
@@ -91,7 +91,7 @@
 		pthread_exit(NULL);
 	}
 
-	if (msg_receive(&ctx, &msg, sizeof(msg),10) != sizeof(msg)) {
+	if (msg_receive(&ctx, &msg, sizeof(msg), 600) != sizeof(msg)) {
 		msg_close(&ctx);
 		msg_close(req->rr_resp_ctx);
 		msg_free_ctx(req->rr_resp_ctx);
--- cluster/rgmanager/src/daemons/rg_state.c	2006/07/19 18:43:32	1.18
+++ cluster/rgmanager/src/daemons/rg_state.c	2006/08/07 22:05:01	1.19
@@ -21,7 +21,11 @@
 #include <platform.h>
 #include <message.h>
 #include <members.h>
+#ifdef OPENAIS
+#include <ds.h>
+#else
 #include <vf.h>
+#endif
 #include <stdio.h>
 #include <string.h>
 #include <resgroup.h>
@@ -147,7 +151,7 @@
 {
 	char res[256];
 
-	snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
+	snprintf(res, sizeof(res), "rg=\"%s\"", name);
 	return clu_lock(LKM_EXMODE, p, 0, res);
 }
 
@@ -228,8 +232,9 @@
 	swab_SmMessageSt(msgp);
 	msg_send(req->rr_resp_ctx, msgp, sizeof(*msgp));
 
-	/* :) */
+	/* :( */
 	msg_close(req->rr_resp_ctx);
+	msg_free_ctx(req->rr_resp_ctx);
 	req->rr_resp_ctx = NULL;
 }
 
@@ -237,19 +242,27 @@
 int
 set_rg_state(char *name, rg_state_t *svcblk)
 {
-	cluster_member_list_t *membership;
 	char res[256];
+#ifndef OPENAIS
+	cluster_member_list_t *membership;
 	int ret;
+#endif
 
 	if (name)
 		strncpy(svcblk->rs_name, name, sizeof(svcblk->rs_name));
 
+	snprintf(res, sizeof(res), "rg=\"%s\"", name);
+#ifdef OPENAIS
+	if (ds_write(res, svcblk, sizeof(*svcblk)) < 0)
+		return -1;
+	return 0;
+#else
 	membership = member_list();
-	snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
 	ret = vf_write(membership, VFF_IGN_CONN_ERRORS, res, svcblk,
        		       sizeof(*svcblk));
 	free_member_list(membership);
 	return ret;
+#endif
 }
 
 
@@ -272,18 +285,50 @@
 {
 	char res[256];
 	int ret;
-	void *data = NULL;
-	uint32_t datalen = 0;
+#ifdef OPENAIS
+	char data[DS_MIN_SIZE];
+	int datalen;
+#else
 	uint64_t viewno;
+	void *data = NULL;
 	cluster_member_list_t *membership;
+	uint32_t datalen = 0;
+#endif
 
 	/* ... */
 	if (name)
 		strncpy(svcblk->rs_name, name, sizeof(svcblk->rs_name));
 
-	membership = member_list();
+	snprintf(res, sizeof(res),"rg=\"%s\"", svcblk->rs_name);
 
-	snprintf(res, sizeof(res),"usrm::rg=\"%s\"", svcblk->rs_name);
+#ifdef OPENAIS
+	while((datalen = ds_read(res, data, sizeof(data))) < 0) {
+		if (errno == ENOENT) {
+			ds_key_init(res, DS_MIN_SIZE, 10);
+		} else {
+			return -1;
+		}
+	}
+
+	if (datalen < 0) {
+
+		ret = init_rg(name, svcblk);
+		if (ret < 0) {
+			printf("Couldn't initialize rg %s!\n", name);
+			return RG_EFAIL;
+		}
+
+		datalen = ds_read(res, &data, sizeof(data));
+		if (ret < 0) {
+			printf("Couldn't reread rg %s! (%d)\n", name, ret);
+			return RG_EFAIL;
+		}
+	}
+
+	memcpy(svcblk, data, sizeof(*svcblk));
+	return 0;
+#else
+	membership = member_list();
 	ret = vf_read(membership, res, &viewno, &data, &datalen);
 
 	if (ret != VFR_OK || datalen == 0) {
@@ -307,7 +352,7 @@
 		}
 	}
 
-	if (datalen != sizeof(*svcblk)) {
+	if (datalen < sizeof(*svcblk)) {
 		printf("Size mismatch; expected %d got %d\n",
 		       (int)sizeof(*svcblk), datalen);
 		if (data)
@@ -322,6 +367,7 @@
 	free_member_list(membership);
 
 	return 0;
+#endif
 }
 
 
@@ -331,22 +377,32 @@
 {
 	char res[256];
 	int ret;
+#ifdef OPENAIS
+	char data[1024];
+	int datalen;
+#else
 	void *data = NULL;
-	uint32_t datalen = 0;
 	uint64_t viewno;
+	uint32_t datalen;
+#endif
 
 	/* ... */
 	if (name)
 		strncpy(svcblk->rs_name, name, sizeof(svcblk->rs_name));
 
-	snprintf(res, sizeof(res),"usrm::rg=\"%s\"", svcblk->rs_name);
+	snprintf(res, sizeof(res),"rg=\"%s\"", svcblk->rs_name);
+
+#ifdef OPENAIS
+	ret = ds_read(res, data, sizeof(data));
+	if (ret <= 0) {
+#else
 	ret = vf_read_local(res, &viewno, &data, &datalen);
 
 	if (ret != VFR_OK || datalen == 0 ||
 	    datalen != sizeof(*svcblk)) {
 		if (data)
 			free(data);
-
+#endif
 		svcblk->rs_owner = 0;
 		svcblk->rs_last_owner = 0;
 		svcblk->rs_state = RG_STATE_UNINITIALIZED;
@@ -359,7 +415,9 @@
 
 	/* Copy out the data. */
 	memcpy(svcblk, data, sizeof(*svcblk));
+#ifndef OPENAIS
 	free(data);
+#endif
 	return 0;
 }
 
@@ -693,7 +751,7 @@
 	else
 		svcStatus.rs_restarts = 0;
 
-	if (set_rg_state(svcName, &svcStatus) != 0) {
+	if (set_rg_state(svcName, &svcStatus) < 0) {
 		clulog(LOG_ERR,
 		       "#47: Failed changing service status\n");
 		rg_unlock(&lockp);
@@ -1318,9 +1376,8 @@
 			/* state uncertain */
 			free_member_list(allowed_nodes);
 			clulog(LOG_DEBUG, "State Uncertain: svc:%s "
-			       "nid:%08x%08x req:%d\n", svcName,
-			       (uint32_t)(target>>32)&0xffffffff,
-			       (uint32_t)(target&0xffffffff), request);
+			       "nid:%08x req:%d\n", svcName,
+			       target, request);
 			return 0;
 		case 0:
 			*new_owner = target;
--- cluster/rgmanager/src/utils/clustat.c	2006/07/11 23:52:41	1.18
+++ cluster/rgmanager/src/utils/clustat.c	2006/08/07 22:05:01	1.19
@@ -168,24 +168,31 @@
 	while ((ret = malloc(sizeof(*ret))) == NULL)
 		sleep(1);
 	
-	x = 1;
-	while (1) {
+	x = 0;
+	while (++x) {
+		name = NULL;
 		snprintf(buf, sizeof(buf),
 			"/cluster/clusternodes/clusternode[%d]/@name", x);
 
 		if (ccs_get(desc, buf, &name) != 0)
 			break;
 
+		if (!name)
+			break;
+		if (!strlen(name)) {
+			free(name);
+			continue;
+		}
+
 		if (!nodes) {
 			nodes = malloc(x * sizeof(cman_node_t));
-			if (!nodes ) {
+			if (!nodes) {
 				perror("malloc");
 				ccs_disconnect(desc);
 				exit(1);
 			}
-			memset(nodes, 0, x * sizeof(cman_node_t));
 		} else {
-			nodes = realloc(ret, x * sizeof(cman_node_t));
+			nodes = realloc(nodes, x * sizeof(cman_node_t));
 			if (!nodes) {
 				perror("realloc");
 				ccs_disconnect(desc);
@@ -207,14 +214,11 @@
 		}
 
 		ret->cml_count = x;
-		++x;
 	}
 
 	ccs_disconnect(desc);
-
 	ret->cml_members = nodes;
 
-
 	return ret;
 }
 
@@ -615,7 +619,6 @@
 
 		/* Flag online nodes */
 		flag_nodes(all, part, FLAG_UP);
-
 		free_member_list(part);
 	} else {
 		/* not root - keep it simple for the next block */


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]