[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] cluster/rgmanager/src clulib/Makefile clulib/a ...



CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	lhh sourceware org	2006-07-11 23:52:42

Modified files:
	rgmanager/src/clulib: Makefile alloc.c lock.c lockspace.c 
	                      members.c message.c msgsimple.c 
	                      rg_strings.c signals.c vft.c 
	rgmanager/src/daemons: Makefile fo_domain.c groups.c main.c 
	                       nodeevent.c reslist.c restree.c 
	                       rg_forward.c rg_state.c test.c 
	rgmanager/src/resources: service.sh 
	rgmanager/src/utils: clustat.c clusvcadm.c 
Added files:
	rgmanager/src/clulib: cman.c locktest.c msg_cluster.c 
	                      msg_socket.c msgtest.c 
	rgmanager/src/daemons: rg_event.c 

Log message:
	- Make rgmanager actually do things.
	- Finish port of rgmanager to CMAN messaging.
	- Add feature to wait for nodes to be fenced prior to handling a
	node-down event.
	- Add direct DLM lock support.
	- Fix local communication.
	- Optimize VF data distribution algorithm to use CMAN/Totem's broadcast
	mode; this should make rgmanager much more scalable.
	- Add multiplexing for CMAN communications so threads can have pseudo
	private channels over the One CMAN socket.
	- Add service->service dependencies based on service events.
	- Add node ID display to clustat text-mode output.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/cman.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/locktest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_cluster.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_socket.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgtest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/Makefile.diff?cvsroot=cluster&r1=1.8&r2=1.9
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/alloc.c.diff?cvsroot=cluster&r1=1.7&r2=1.8
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgsimple.c.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/rg_strings.c.diff?cvsroot=cluster&r1=1.3&r2=1.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/signals.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/vft.c.diff?cvsroot=cluster&r1=1.13&r2=1.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_event.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/Makefile.diff?cvsroot=cluster&r1=1.12&r2=1.13
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/fo_domain.c.diff?cvsroot=cluster&r1=1.8&r2=1.9
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/groups.c.diff?cvsroot=cluster&r1=1.18&r2=1.19
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/main.c.diff?cvsroot=cluster&r1=1.25&r2=1.26
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/nodeevent.c.diff?cvsroot=cluster&r1=1.2&r2=1.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/reslist.c.diff?cvsroot=cluster&r1=1.13&r2=1.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/restree.c.diff?cvsroot=cluster&r1=1.19&r2=1.20
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_forward.c.diff?cvsroot=cluster&r1=1.4&r2=1.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_state.c.diff?cvsroot=cluster&r1=1.16&r2=1.17
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/test.c.diff?cvsroot=cluster&r1=1.4&r2=1.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/resources/service.sh.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clustat.c.diff?cvsroot=cluster&r1=1.17&r2=1.18
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clusvcadm.c.diff?cvsroot=cluster&r1=1.8&r2=1.9

/cvs/cluster/cluster/rgmanager/src/clulib/cman.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/cman.c
+++ -	2006-07-11 23:52:43.661578000 +0000
@@ -0,0 +1,264 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
+/**
+  pthread mutex wrapper for a global CMAN handle
+ */
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+
+static cman_handle_t *_chandle = NULL;
+static pthread_mutex_t _chandle_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t _chandle_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t _chandle_holder = 0;
+static int _chandle_preempt = 0;
+static int _wakeup_pipe[2] = { -1, -1 };
+
+static void
+_set_nonblock(int fd)
+{
+	int flags;
+
+	flags = fcntl(fd, F_GETFL, 0);
+	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
+		perror("fcntl");
+}
+
+
+/**
+  Lock / return the global CMAN handle.
+
+  @param block		If nonzero, we wait until the handle is released
+  @param preempt	If nonzero, *try* to wake up the holder who has
+			taken the lock with cman_lock_preemptible.  Will not
+			wake up holders which took it with cman_lock().
+  @return		NULL / errno on failure; the global CMAN handle
+			on success.
+ */
+cman_handle_t *
+cman_lock(int block, int preempt)
+{
+	pthread_t tid;
+	cman_handle_t *ret = NULL;
+
+	pthread_mutex_lock(&_chandle_lock);
+	if (_chandle == NULL) {
+		errno = ENOSYS;
+		goto out_unlock;
+	}
+
+	tid = pthread_self();
+	if (_chandle_holder == tid) {
+		errno = EDEADLK;
+		goto out_unlock;
+	}
+
+	if (_chandle_holder > 0) {
+		if (!block) {
+			errno = EAGAIN;
+			goto out_unlock;
+		}
+
+		/* Try to wake up the holder! */
+		if (preempt)
+			write(_wakeup_pipe[1], "!", 1);
+
+		/* Blocking call; do the cond-thing */
+		pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+	}
+		
+	_chandle_holder = tid;
+	ret = _chandle;
+out_unlock:
+	pthread_mutex_unlock(&_chandle_lock);
+	return ret;
+}
+
+
+/**
+  Lock / return the global CMAN handle.
+
+  @param block		If nonzero, we wait until the handle is released
+  @param preempt_fd	Caller should include this file descriptor in
+			blocking calls to select(2), so that we can wake
+			it up if someone calls with cman_lock(xxx, 1);
+  @return		NULL / errno on failure; the global CMAN handle
+			on success.
+ */
+cman_handle_t *
+cman_lock_preemptible(int block, int *preempt_fd)
+{
+	pthread_t tid;
+	cman_handle_t *ret = NULL;
+
+	if (preempt_fd == NULL) {
+		errno = EINVAL;
+		return NULL;
+	}
+
+	pthread_mutex_lock(&_chandle_lock);
+	if (_chandle == NULL) {
+		errno = ENOSYS;
+		goto out_unlock;
+	}
+
+	tid = pthread_self();
+	if (_chandle_holder == tid) {
+		errno = EDEADLK;
+		goto out_unlock;
+	}
+
+	if (_chandle_holder > 0) {
+		if (!block) {
+			errno = EAGAIN;
+			goto out_unlock;
+		}
+
+		/* Blocking call; do the cond-thing */
+		pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+	}
+
+	*preempt_fd = _wakeup_pipe[0];
+	_chandle_holder = tid;
+	_chandle_preempt = 1;
+	ret = _chandle;
+out_unlock:
+	pthread_mutex_unlock(&_chandle_lock);
+	return ret;
+}
+
+
+/**
+  Release the global CMAN handle
+
+  @param ch		Should match the global handle
+  @return		-1 on failure, 0 on success
+ */
+int
+cman_unlock(cman_handle_t *ch)
+{
+	int ret = -1;
+	char c;
+
+	pthread_mutex_lock(&_chandle_lock);
+	if (_chandle == NULL) {
+		errno = ENOSYS;
+		goto out_unlock;
+	}
+
+	if (_chandle_holder != pthread_self() || !_chandle_holder) {
+		errno = EBUSY;
+		goto out_unlock;
+	}
+
+	if (_chandle != ch) {
+		errno = EINVAL;
+		goto out_unlock;
+	}
+
+	/* Empty wakeup pipe if we took it with the preempt flag */
+	if (_chandle_preempt)
+		read(_wakeup_pipe[0], &c, 1);
+
+	_chandle_preempt = 0;
+	_chandle_holder = 0;
+	ret = 0;
+
+out_unlock:
+	pthread_mutex_unlock(&_chandle_lock);
+	if (ret == 0) 
+		pthread_cond_broadcast(&_chandle_cond);
+	return ret;
+}
+
+
+int
+cman_init_subsys(cman_handle_t *ch)
+{
+	int ret = -1;
+
+	pthread_mutex_lock(&_chandle_lock);
+	if (_chandle) {
+		errno = EAGAIN;
+		goto out_unlock;
+	}
+
+	if (!ch) {
+		errno = EAGAIN;
+		goto out_unlock;
+	}
+
+	if (pipe(_wakeup_pipe) < 0) {
+		goto out_unlock;
+	}
+
+	_set_nonblock(_wakeup_pipe[0]);
+	_chandle = ch;
+	_chandle_holder = 0;
+	ret = 0;
+
+out_unlock:
+	pthread_mutex_unlock(&_chandle_lock);
+	return ret;
+}
+
+
+int
+cman_cleanup_subsys(void)
+{
+	int ret = -1;
+
+	pthread_mutex_lock(&_chandle_lock);
+	if (!_chandle) {
+		errno = EAGAIN;
+		goto out_unlock;
+	}
+
+	if (_chandle_holder > 0) {
+		pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+	}
+
+	ret = 0;
+	_chandle = NULL;
+	_chandle_holder = 0;
+
+	close(_wakeup_pipe[0]);
+	close(_wakeup_pipe[1]);
+	
+out_unlock:
+	pthread_mutex_unlock(&_chandle_lock);
+	return ret;
+}
+
+
+int
+cman_send_data_unlocked(void *buf, int len, int flags,
+			uint8_t port, int nodeid)
+{
+	return cman_send_data(_chandle, buf, len, flags, port, nodeid);
+}
/cvs/cluster/cluster/rgmanager/src/clulib/locktest.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/locktest.c
+++ -	2006-07-11 23:52:43.746107000 +0000
@@ -0,0 +1,85 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
+#include <lock.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <pthread.h>
+#include <signal.h>
+
+
+void *
+lock_thread(void *arg)
+{
+	struct dlm_lksb lksb;
+
+	while(1) {
+		printf("Taking lock..\n");
+		clu_lock(LKM_EXMODE, &lksb, 0, arg);
+		printf("Thread acquired lock on %s\n", (char *)arg);
+		clu_unlock(&lksb);
+	}
+}
+
+
+
+
+int
+main(int argc, char **argv)
+{
+	struct dlm_lksb lksb;
+	int ret;
+	pthread_t th;
+
+	if (clu_lock_init("Testing") != 0) {
+		perror("clu_lock_init");
+		return 1;
+	}
+
+	if (argc < 2) {
+		printf("Lock what?\n");
+		return 1;
+	}
+
+	if (argc == 3) {
+		pthread_create(&th, NULL, lock_thread, strdup(argv[1]));
+	}
+
+	memset(&lksb,0,sizeof(lksb));
+	ret = clu_lock(LKM_EXMODE, &lksb, 0, argv[1]);
+	if (ret < 0) {
+		perror("clu_lock");
+		return 1;
+	}
+
+	printf("Acquired lock on %s; press enter to release\n", argv[1]);
+	getchar();
+
+	clu_unlock(&lksb);
+
+	if (argc == 3) {
+		printf("Press enter to kill lock thread...\n");
+		getchar();
+		pthread_kill(th, SIGTERM);
+	}
+
+	clu_lock_finished("Testing");
+
+	return 0;
+}
/cvs/cluster/cluster/rgmanager/src/clulib/msg_cluster.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msg_cluster.c
+++ -	2006-07-11 23:52:43.828222000 +0000
@@ -0,0 +1,1104 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <time.h>
+#include <sys/time.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <signals.h>
+#include <cman-private.h>
+
+/* Ripped from ccsd's setup_local_socket */
+#define MAX_CONTEXTS 32  /* Testing; production should be 1024-ish */
+
+int cluster_msg_close(msgctx_t *ctx);
+
+/* Context 0 is reserved for control messages */
+
+/* Local-ish contexts */
+static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
+static msgctx_t *contexts[MAX_CONTEXTS];
+static int _me = 0;
+pthread_t comms_thread;
+int thread_running;
+
+#define is_established(ctx) \
+	(((ctx->type == MSG_CLUSTER) && \
+ 	  (ctx->u.cluster_info.remote_ctx && \
+	   ctx->u.cluster_info.local_ctx)) || \
+	 ((ctx->type == MSG_SOCKET) && \
+	  (ctx->u.local_info.sockfd != -1)))
+
+static msg_ops_t cluster_msg_ops;
+
+
+static int
+cluster_msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+	char buf[4096];
+	cluster_msg_hdr_t *h = (void *)buf;
+	int ret;
+	char *msgptr = (buf + sizeof(*h));
+
+	errno = EINVAL;
+	if (ctx->type != MSG_CLUSTER)
+		return -1;
+	if (!(ctx->flags & SKF_WRITE))
+		return -1;
+	if ((len + sizeof(*h)) > sizeof(buf)) {
+		errno = E2BIG;
+		return -1;
+	}
+
+	h->msg_control = M_DATA;
+	h->src_ctx = ctx->u.cluster_info.local_ctx;
+	h->dest_ctx = ctx->u.cluster_info.remote_ctx;
+	h->msg_port = ctx->u.cluster_info.port;
+	memcpy(msgptr, msg, len);
+	h->src_nodeid = _me;
+
+	/*
+	printf("sending cluster message, length = %d to nodeid %d port %d\n",
+	       len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
+	 */
+
+	swab_cluster_msg_hdr_t(h);
+
+	ret = cman_send_data_unlocked((void *)h, len + sizeof(*h),
+			       ctx->u.cluster_info.nodeid,
+			       ctx->u.cluster_info.port, 0);
+
+	return len + sizeof(h);
+}
+
+
+/**
+  Assign a (free) cluster context ID if possible
+ */
+static int
+assign_ctx(msgctx_t *ctx)
+{
+	int start;
+	static uint32_t context_index = 1;
+
+	/* Assign context index */
+	pthread_mutex_lock(&context_lock);
+
+	start = context_index;
+	do {
+		context_index++;
+		if (context_index >= MAX_CONTEXTS)
+			context_index = 1;
+
+		if (!contexts[context_index]) {
+			contexts[start] = ctx;
+			ctx->u.cluster_info.local_ctx = start;
+			pthread_mutex_unlock(&context_lock);
+			return 0;
+		}
+	} while (context_index != start);
+	
+	pthread_mutex_unlock(&context_lock);
+
+	errno = EAGAIN;
+	return -1;
+}
+
+
+/* See if anything's on the cluster socket.  If so, dispatch it
+   on to the requisite queues
+   XXX should be passed a connection arg! */
+static int
+poll_cluster_messages(int timeout)
+{
+	int ret = -1;
+	fd_set rfds;
+	int fd, lfd, max;
+	struct timeval tv;
+	struct timeval *p = NULL;
+	cman_handle_t *ch;
+
+	if (timeout >= 0) {
+		p = &tv;
+		tv.tv_sec = tv.tv_usec = timeout;
+	}
+
+	FD_ZERO(&rfds);
+
+	/* This sucks - it could cause other threads trying to get a
+	   membership list to block for a long time.  Now, that should not
+	   happen.  Basically, when we get a membership event, we generate 
+	   a new membership list in a locally cached copy and reference
+	   that.
+
+	 */
+	ch = cman_lock_preemptible(1, &lfd);
+	if (!ch) {
+		printf("%s\n", strerror(errno));
+	}
+
+	fd = cman_get_fd(ch);
+	if (fd < 0) {
+		cman_unlock(ch);
+		return 0;
+	}
+	FD_SET(fd, &rfds);
+	FD_SET(lfd, &rfds);
+
+	max = (lfd > fd ? lfd : fd);
+	if (select(fd + 1, &rfds, NULL, NULL, p) > 0) {
+		/* Someone woke us up */
+		if (FD_ISSET(lfd, &rfds)) {
+			cman_unlock(ch);
+			errno = EAGAIN;
+			return -1;
+		}
+
+		cman_dispatch(ch, 0);
+		ret = 0;
+	}
+	cman_unlock(ch);
+
+	return ret;
+}
+
+
+/**
+  This is used to establish and tear down pseudo-private
+  contexts which are shared with the cluster context.
+ */
+static int
+cluster_send_control_msg(msgctx_t *ctx, int type)
+{
+	cluster_msg_hdr_t cm;
+	int ret;
+
+	cm.msg_control = (uint8_t)type;
+	cm.src_nodeid = _me;
+	cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
+	cm.src_ctx = ctx->u.cluster_info.local_ctx;
+	cm.msg_port = ctx->u.cluster_info.port;
+
+	swab_cluster_msg_hdr_t(&cm);
+
+	ret = (cman_send_data_unlocked((void *)&cm, sizeof(cm),
+			       ctx->u.cluster_info.nodeid,
+			       ctx->u.cluster_info.port, 0));
+	return ret;
+}
+
+
+/**
+  Wait for a message on a context.
+ */
+static int
+cluster_msg_wait(msgctx_t *ctx, int timeout)
+{
+	struct timespec ts = {0, 0};
+	int req = M_NONE;
+	int e;
+
+	errno = EINVAL;
+	if (!ctx)
+		return -1;
+	if (ctx->type != MSG_CLUSTER)
+		return -1;
+	if (!(ctx->flags & (SKF_READ | SKF_LISTEN)))
+		return -1;
+
+	if (timeout > 0) {
+		gettimeofday((struct timeval *)&ts, NULL);
+		ts.tv_sec += timeout;
+		ts.tv_nsec *= 1000;
+	}
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	while (1) {
+
+		/* See if we dispatched any messages on to our queue */
+		if (ctx->u.cluster_info.queue) {
+			req = ctx->u.cluster_info.queue->message->msg_control;
+			/*printf("Queue not empty CTX%d : %d\n",
+			  	 ctx->u.cluster_info.local_ctx, req);*/
+			break;
+		}
+
+		/* Ok, someone else has the mutex on our FD.  Go to
+	   	   sleep on a cond; maybe they'll wake us up */
+		e = pthread_cond_timedwait(&ctx->u.cluster_info.cond,
+		    			   &ctx->u.cluster_info.mutex,
+		   			   &ts);
+
+		if (timeout == 0) {
+			break;
+		}
+
+		if (e == 0) {
+			continue;
+		}
+
+		if (e == ETIMEDOUT) {
+			break;
+		}
+	}
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+	return req;
+}
+
+
+static int
+cluster_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+	int e;
+	msg_q_t *n;
+	
+	errno = EINVAL;
+	if (!ctx || !fds)
+		return -1;
+	if (ctx->type != MSG_CLUSTER)
+		return -1;
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	if (ctx->u.cluster_info.select_pipe[0] < 0) {
+		if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
+			e = errno;
+			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+			errno = e;
+			return -1;
+		}
+
+		/*
+		printf("%s: Created cluster CTX select pipe "
+		       "rd=%d wr=%d\n", __FUNCTION__,
+		       ctx->u.cluster_info.select_pipe[0],
+		       ctx->u.cluster_info.select_pipe[1]);
+		 */
+
+		/* Ok, we just created the pipe.  Now, we need to write
+		   a char for every unprocessed event to the pipe, because
+		   events could be pending that would otherwise be unhandled
+		   by the caller because the caller is switching to select()
+		   semantics. (as opposed to msg_wait() ) */
+		list_do(&ctx->u.cluster_info.queue, n) {
+			write(ctx->u.cluster_info.select_pipe[1], "", 1);
+		} while (!list_done(&ctx->u.cluster_info.queue, n));
+	}
+
+	e = ctx->u.cluster_info.select_pipe[0];
+	//printf("%s: cluster %d\n", __FUNCTION__,  e);
+	FD_SET(e, fds);
+
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+	if (max && (e > *max))
+		*max = e;
+	return 0;
+}
+
+
+int
+cluster_msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+
+	if (!fds || !ctx)
+		return -1;
+
+	if (ctx->type != MSG_CLUSTER)
+		return -1;
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
+	    FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		return 1;
+	}
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	return 0;
+}
+
+
+int
+cluster_msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+
+	if (!fds || !ctx)
+		return -1;
+
+	if (ctx->type != MSG_CLUSTER)
+		return -1;
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+	    	FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		return 1;
+	}
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	return 0;
+}
+
+
+static int
+_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
+{
+	cluster_msg_hdr_t *m;
+	msg_q_t *n;
+	int ret = 0;
+	char foo;
+
+	if (msg)
+		*msg = NULL;
+	if (len)
+		*len = 0;
+
+	if (ctx->u.cluster_info.local_ctx < 0 ||
+	    ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+		errno = EBADF;
+		return -1;
+	}
+
+	/* trigger receive here */
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+	n = ctx->u.cluster_info.queue;
+	if (n == NULL) {
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		errno = EAGAIN;
+		return -1;
+	}
+
+	list_remove(&ctx->u.cluster_info.queue, n);
+
+	if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+		//printf("%s read\n", __FUNCTION__);
+		read(ctx->u.cluster_info.select_pipe[0],
+	     	     &foo, 1);
+	}
+
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+	m = n->message;
+	switch(m->msg_control) {
+	case M_CLOSE:
+		ctx->u.cluster_info.remote_ctx = 0;
+		break;
+	case M_OPEN_ACK:
+		/* Response to new connection */
+		ctx->u.cluster_info.remote_ctx = m->src_ctx;
+		break;
+	case M_DATA:
+		/* Kill the message control structure */
+		memmove(m, &m[1], n->len - sizeof(*m));
+		if (msg)
+			*msg = (void *)m;
+		else {
+			printf("Warning: dropping data message\n");
+			free(m);
+		}
+		if (len)
+			*len = (n->len - sizeof(*m));
+		ret = (n->len - sizeof(*m));
+		free(n);
+
+		//printf("Message received\n");
+		return ret;
+	case M_OPEN:
+		/* Someone is trying to open a connection */
+	default:
+		/* ?!! */
+		ret = -1;
+		break;
+	}
+
+	free(m);
+	free(n);
+
+	return ret;
+}
+
+
+/**
+  Receive a message from a cluster-context.  This copies out the contents
+  into the user-specified buffer, and does random other things.
+ */
+static int
+cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	int req;
+	msg_q_t *n;
+	void *priv_msg;
+	size_t priv_len;
+	char foo;
+
+	errno = EINVAL;
+	if (!ctx)
+		return -1;
+	if (!(ctx->flags & SKF_READ))
+		return -1;
+
+	req = cluster_msg_wait(ctx, timeout);
+
+	switch (req) {
+	case M_DATA:
+		/* Copy out. */
+		req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
+		if (req < 0) {
+			printf("Ruh roh!\n");
+			return -1;
+		}
+
+		priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+		if (msg && maxlen)
+			memcpy(msg, priv_msg, priv_len);
+		free(priv_msg);
+		return req;
+	case M_CLOSE:
+		errno = ECONNRESET;
+		return -1;
+	case 0:
+		//printf("Nothing on queue\n");
+		return 0;
+	case M_STATECHANGE:
+	case M_PORTOPENED:
+	case M_PORTCLOSED:
+	case M_TRY_SHUTDOWN:
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+		n = ctx->u.cluster_info.queue;
+		if (n == NULL) {
+			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+			errno = EAGAIN;
+			return -1;
+		}
+
+		list_remove(&ctx->u.cluster_info.queue, n);
+
+		if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+			//printf("%s read\n", __FUNCTION__);
+			read(ctx->u.cluster_info.select_pipe[0],
+		     	     &foo, 1);
+		}
+	
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+		if (n->message)
+			free(n->message);
+		free(n);
+		return 0;
+	default:
+		printf("PROTOCOL ERROR: Received %d\n", req);
+		return -1;
+	}
+
+	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+	return -1;
+}
+
+
+/**
+  Open a connection to the specified node ID.
+  If the speficied node is 0, this connects via the socket in
+  /var/run/cluster...
+ */
+static int
+cluster_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+	int t = 0, ret;
+
+	errno = EINVAL;
+	if (!ctx)
+		return -1;
+
+	if (type != MSG_CLUSTER)
+		return -1;
+
+	/*printf("Opening pseudo channel to node %d\n", nodeid);*/
+	
+	ctx->type = MSG_CLUSTER;
+	ctx->ops = &cluster_msg_ops;
+	ctx->flags = 0;
+	ctx->u.cluster_info.nodeid = nodeid;
+	ctx->u.cluster_info.port = port;
+	ctx->u.cluster_info.local_ctx = -1;
+	ctx->u.cluster_info.remote_ctx = 0;
+	ctx->u.cluster_info.queue = NULL;
+
+	/* Assign context index */
+	if (assign_ctx(ctx) < 0) {
+		errno = EAGAIN;
+		return -1;
+	}
+	ctx->flags = SKF_READ | SKF_WRITE;
+
+	if (nodeid == CMAN_NODEID_US) {
+		/* Broadcast pseudo ctx; no handshake needed */
+		ctx->flags |= SKF_MCAST;
+		return 0;
+	}
+
+	//printf("  Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
+
+	/* Send open */
+	//printf("  Sending control message M_OPEN\n");
+	if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+		return -1;
+	}
+
+	/* Ok, wait for a response */
+	while (!is_established(ctx)) {
+		++t;
+		if (t > timeout) {
+			cluster_msg_close(ctx);
+			errno = ETIMEDOUT;
+			return -1;
+		}
+			
+		ret = cluster_msg_wait(ctx, 1);
+		switch(ret) {
+		case M_OPEN_ACK:
+			_cluster_msg_receive(ctx, NULL, NULL);
+			break;
+		case M_NONE:
+			continue;
+		default: 
+			printf("PROTO ERROR: M_OPEN_ACK not received: %d %d\n",
+				ret, errno);
+		}
+	}
+
+	/*	
+	printf("  Remote CTX: %d\n",
+	       ctx->u.cluster_info.remote_ctx);
+	printf("  Pseudo channel established!\n");
+	*/
+	return 0;
+}
+
+
+/**
+  Close a connection context (cluster or socket; it doesn't matter)
+  In the case of a cluster context, we need to clear out the 
+  receive queue and what-not.  This isn't a big deal.  Also, we
+  need to tell the other end that we're done -- just in case it does
+  not know yet ;)
+
+  With a socket, the O/S cleans up the buffers for us.
+ */
+int
+cluster_msg_close(msgctx_t *ctx)
+{
+	msg_q_t *n = NULL;
+
+	errno = EINVAL;
+
+	if (!ctx)
+		return -1;
+	if (ctx->type != MSG_CLUSTER)
+		return -1;
+
+	if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+		printf("Context invalid during close\n");
+		return -1;
+	}
+
+	pthread_mutex_lock(&context_lock);
+	/* Other threads should not be able to see this again */
+	if (contexts[ctx->u.cluster_info.local_ctx] &&
+	    (contexts[ctx->u.cluster_info.local_ctx]->u.cluster_info.local_ctx ==
+                ctx->u.cluster_info.local_ctx)) {
+		//printf("reclaimed context %d\n", 
+			//ctx->u.cluster_info.local_ctx);
+		contexts[ctx->u.cluster_info.local_ctx] = NULL;
+	}
+	pthread_mutex_unlock(&context_lock);
+
+	/* Clear receive queue */
+	while ((n = ctx->u.cluster_info.queue) != NULL) {
+		list_remove(&ctx->u.cluster_info.queue, n);
+		free(n->message);
+		free(n);
+	}
+	/* Send close message */
+	if (ctx->u.cluster_info.remote_ctx != 0) {
+		cluster_send_control_msg(ctx, M_CLOSE);
+	}
+
+	/* Close pipe if it's open */
+	if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+		close(ctx->u.cluster_info.select_pipe[0]);
+		ctx->u.cluster_info.select_pipe[0] = -1;
+	}
+	if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+		close(ctx->u.cluster_info.select_pipe[1]);
+		ctx->u.cluster_info.select_pipe[1] = -1;
+	}
+	ctx->type = MSG_NONE;
+	ctx->ops = NULL;
+	return 0;
+}
+
+
+static void 
+queue_for_context(msgctx_t *ctx, char *buf, int len)
+{
+	msg_q_t *node;
+
+	while ((node = malloc(sizeof(*node))) == NULL) {
+		sleep(1);
+	}
+	memset(node, 0, sizeof(*node));
+	while ((node->message = malloc(len)) == NULL) {
+		sleep(1);
+	}
+	memcpy(node->message, buf, len);
+	node->len = len;
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	list_insert(&ctx->u.cluster_info.queue, node);
+	/* If a select pipe was set up, wake it up */
+	if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+		//printf("QUEUE_FOR_CONTEXT write\n");
+		if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0)
+			perror("queue_for_context write");
+	}
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/**
+  Called by cman_dispatch to deal with messages coming across the
+  cluster socket.  This function deals with fanning out the requests
+  and putting them on the per-context queues.  We don't have
+  the benefits of pre-configured buffers, so we need this.
+ */
+static void
+process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
+	    uint8_t port, int nodeid)
+{
+	cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
+	int x;
+
+	if (len < sizeof(*m)) {
+		printf("Message too short.\n");
+		return;
+	}
+
+	swab_cluster_msg_hdr_t(m);
+
+#ifdef DEBUG
+	printf("Processing ");
+	switch(m->msg_control) {
+	case M_NONE: 
+		printf("M_NONE\n");
+		break;
+	case M_OPEN:
+		printf("M_OPEN\n");
+		break;
+	case M_OPEN_ACK: 
+		printf("M_OPEN_ACK\n");
+		break;
+	case M_DATA: 
+		printf("M_DATA\n");
+		break;
+	case M_CLOSE: 
+		printf("M_CLOSE\n");
+		break;
+	}
+
+	printf("  Node ID: %d %d\n", m->src_nodeid, nodeid);
+	printf("  Remote CTX: %d  Local CTX: %d\n", m->src_ctx, m->dest_ctx);
+#endif
+
+	if (m->dest_ctx >= MAX_CONTEXTS || m->dest_ctx < 0) {
+		printf("Context invalid; ignoring\n");
+		return;
+	}
+
+	pthread_mutex_lock(&context_lock);
+
+	if (m->dest_ctx == 0 && m->msg_control == M_DATA) {
+		/* Copy & place on all broadcast queues if it's a broadcast
+		   M_DATA message... */
+		for (x = 0; x < MAX_CONTEXTS; x++) {
+			if (!contexts[x])
+				continue;
+			if (contexts[x]->type != MSG_CLUSTER)
+				continue;
+			if (!(contexts[x]->flags & SKF_MCAST))
+				continue;
+			if (!(contexts[x]->flags & SKF_READ))
+				continue;
+
+			queue_for_context(contexts[x], buf, len);
+		}
+	} else if (contexts[m->dest_ctx]) {
+		/* Normal receive */
+		queue_for_context(contexts[m->dest_ctx], buf, len);
+	}
+	/* If none of the above, then we msg for something we've already
+	   detached from our list.  No big deal, just ignore. */
+
+	pthread_mutex_unlock(&context_lock);
+	return;
+}
+
+
+/**
+  Accept a new pseudo-private connection coming in over the
+  cluster socket.
+ */
+static int
+cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+	errno = EINVAL;
+	cluster_msg_hdr_t *m;
+	msg_q_t *n;
+	char done = 0;
+	char foo;
+
+	if (!listenctx || !acceptctx)
+		return -1;
+	if (listenctx->u.cluster_info.local_ctx != 0)
+		return -1;
+	if (!(listenctx->flags & SKF_LISTEN))
+		return -1;
+
+	listenctx->ops->mo_init(acceptctx);
+
+	pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
+
+	n = listenctx->u.cluster_info.queue;
+	if (n == NULL) {
+		pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+		errno = EAGAIN;
+		return -1;
+	}
+
+	/* the OPEN should be the first thing on the list; this loop
+	   is probably not necessary */
+	list_do(&listenctx->u.cluster_info.queue, n) {
+
+		m = n->message;
+		switch(m->msg_control) {
+		case M_OPEN:
+			list_remove(&listenctx->u.cluster_info.queue, n);
+			/*printf("Accepting connection from %d %d\n",
+			  	 m->src_nodeid, m->src_ctx);*/
+
+			/* New connection */
+			pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
+					   NULL);
+			pthread_cond_init(&acceptctx->u.cluster_info.cond,
+					  NULL);
+			acceptctx->u.cluster_info.queue = NULL;
+			acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
+			acceptctx->u.cluster_info.nodeid = m->src_nodeid;
+			acceptctx->u.cluster_info.port = m->msg_port;
+			acceptctx->flags = (SKF_READ | SKF_WRITE);
+
+			if (assign_ctx(acceptctx) < 0) {
+				printf("FAILED TO ASSIGN CONTEXT\n");
+			}
+			cluster_send_control_msg(acceptctx, M_OPEN_ACK);
+
+			if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
+				//printf("%s read\n", __FUNCTION__);
+				read(listenctx->u.cluster_info.select_pipe[0],
+				     &foo, 1);
+			}
+
+			done = 1;
+			free(m);
+			free(n);
+
+			break;
+		case M_DATA:
+			/* Data messages (i.e. from broadcast msgs) are
+			   okay too!...  but we don't handle them here */
+			break;
+		default:
+			/* Other message?! */
+			printf("Odd... %d\n", m->msg_control);
+			break;
+		}
+
+		if (done)
+			break;
+
+	} while (!list_done(&listenctx->u.cluster_info.queue, n));
+
+	pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+
+	return 0;
+}
+
+
+/**
+  This waits for events on the cluster comms FD and
+  dispatches them using cman_dispatch.  Initially,
+  the design had no permanent threads, but that model
+  proved difficult to implement correctly.
+ */
+static void *
+cluster_comms_thread(void *arg)
+{
+	/* SIGUSR2 will cause select() to abort */
+	while (thread_running) {
+		poll_cluster_messages(2);
+	}
+
+	return NULL;
+}
+
+
+/*
+   Transliterates a CMAN event to a control message
+ */
+static void
+process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
+{
+	cluster_msg_hdr_t *msg;
+	int *argp;
+	msg_q_t *node;
+	msgctx_t *ctx;
+
+#if 0
+	printf("EVENT: %p %p %d %d\n", handle, private, reason, arg);
+#endif
+
+	/* Allocate queue node */
+	while ((node = malloc(sizeof(*node))) == NULL) {
+		sleep(1);
+	}
+	memset(node, 0, sizeof(*node));
+
+	/* Allocate message: header + int (for arg) */
+	while ((msg = malloc(sizeof(int)*2 +
+			     sizeof(cluster_msg_hdr_t))) == NULL) {
+		sleep(1);
+	}
+	memset(msg, 0, sizeof(int)*2 +sizeof(cluster_msg_hdr_t));
+
+	switch(reason) {
+#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
+	case CMAN_REASON_PORTOPENED:
+		msg->msg_control = M_PORTOPENED;
+		break;
+	case CMAN_REASON_TRY_SHUTDOWN:
+		msg->msg_control = M_TRY_SHUTDOWN;
+		break;
+#endif
+	case CMAN_REASON_PORTCLOSED:
+		msg->msg_control = M_PORTCLOSED;
+		break;
+	case CMAN_REASON_STATECHANGE:
+		msg->msg_control = M_STATECHANGE;
+		break;
+	}
+
+	argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
+	*argp = arg;
+
+	node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
+	node->message = msg;
+
+	pthread_mutex_lock(&context_lock);
+	ctx = contexts[0]; /* This is the cluster context... */
+	if (!ctx) {
+		/* We received a close for something we've already
+		   detached from our list.  No big deal, just
+		   ignore. */
+		free(node->message);
+		free(node);
+		pthread_mutex_unlock(&context_lock);
+		return;
+	}
+	pthread_mutex_unlock(&context_lock);
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	list_insert(&ctx->u.cluster_info.queue, node);
+	/* If a select pipe was set up, wake it up */
+	if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+		//printf("PROCESS_CMAN_EVENT write\n");
+		if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0)
+			perror("process_cman_event write");
+	}
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/* */
+int
+cluster_msg_listen(int me, void *portp, msgctx_t **cluster_ctx)
+{
+	int e;
+	pthread_attr_t attrs;
+	cman_handle_t *ch = NULL;
+	msgctx_t *ctx;
+	int port;
+
+	errno = EINVAL;
+	if (!portp)
+		return -1;
+	port = *(int *)portp;
+	if (port < 10 || port > 254)
+		return -1;
+
+	ch = cman_lock(1, 0);
+	_me = me;
+
+	/* Set up cluster context */
+	ctx = msg_new_ctx();
+	if (!ctx) {
+		cman_unlock(ch);
+		errno = EINVAL;
+		return -1;
+	}
+
+	memset(contexts, 0, sizeof(contexts));
+
+	*cluster_ctx = ctx;
+	if (cman_start_recv_data(ch, process_cman_msg,
+				 port) != 0) {
+		e = errno;
+		cman_unlock(ch);
+		msg_free_ctx((msgctx_t *)*cluster_ctx);
+		errno = e;
+		return -1;
+	}
+
+	if (cman_start_notification(ch, process_cman_event) != 0) {
+		e = errno;
+		cman_unlock(ch);
+		msg_free_ctx((msgctx_t *)*cluster_ctx);
+		errno = e;
+	}
+
+	cman_unlock(ch);
+	/* Done with CMAN bits */
+
+	pthread_mutex_lock(&context_lock);
+
+	memset(contexts, 0, sizeof(contexts));
+	contexts[0] = ctx;
+
+	ctx->type = MSG_CLUSTER;
+	ctx->ops = &cluster_msg_ops;
+	ctx->u.cluster_info.local_ctx = 0;
+	ctx->u.cluster_info.remote_ctx = 0;
+	ctx->u.cluster_info.port = port; /* port! */
+	ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
+	ctx->u.cluster_info.select_pipe[0] = -1;
+	ctx->u.cluster_info.select_pipe[1] = -1;
+	ctx->u.cluster_info.queue = NULL;
+	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+	ctx->flags = SKF_LISTEN | SKF_READ | SKF_WRITE | SKF_MCAST;
+	pthread_mutex_unlock(&context_lock);
+
+       	pthread_attr_init(&attrs);
+       	pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+       	pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+	thread_running = 1;	
+	pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
+
+	pthread_attr_destroy(&attrs);
+
+	return 0;
+}
+
+
+static void
+cluster_msg_print(msgctx_t *ctx)
+{
+	if (!ctx)
+		return;
+
+	printf("Cluster Message Context %p\n", ctx);
+	printf("  Flags %08x\n", ctx->flags);
+	printf("  Node ID %d\n", ctx->u.cluster_info.nodeid);
+	printf("  Local CTX %d\n", ctx->u.cluster_info.local_ctx);
+	printf("  Remote CTX %d\n", ctx->u.cluster_info.remote_ctx);
+}
+
+
+int
+cluster_msg_shutdown(void)
+{
+	cman_handle_t *ch;
+
+	ch = cman_lock(1, SIGUSR2);
+	cman_end_recv_data(ch);
+	pthread_kill(comms_thread, SIGTERM);
+	cman_unlock(ch);
+
+	return 0;
+}
+
+
+int
+cluster_msg_init(msgctx_t *ctx)
+{
+	errno = EINVAL;
+	if (!ctx)
+		return -1;
+
+	memset(ctx, 0, sizeof(*ctx));
+	ctx->type = MSG_CLUSTER;
+	ctx->ops = &cluster_msg_ops;
+	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+	ctx->u.cluster_info.select_pipe[0] = -1;
+	ctx->u.cluster_info.select_pipe[1] = -1;
+
+	return 0;
+}
+
+
+static msg_ops_t cluster_msg_ops = {
+	.mo_open = cluster_msg_open,
+	.mo_close = cluster_msg_close,
+	.mo_listen = cluster_msg_listen,
+	.mo_accept = cluster_msg_accept,
+	.mo_shutdown = cluster_msg_shutdown,
+	.mo_wait = cluster_msg_wait,
+	.mo_send = cluster_msg_send,
+	.mo_receive = cluster_msg_receive,
+	.mo_fd_set = cluster_msg_fd_set,
+	.mo_fd_isset = cluster_msg_fd_isset,
+	.mo_fd_clr = cluster_msg_fd_clr,
+	.mo_print = cluster_msg_print,
+	.mo_init = cluster_msg_init
+};
/cvs/cluster/cluster/rgmanager/src/clulib/msg_socket.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msg_socket.c
+++ -	2006-07-11 23:52:43.913279000 +0000
@@ -0,0 +1,432 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <fdops.h>
+
+/* Ripped from ccsd's setup_local_socket */
+#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
+
+static msg_ops_t sock_msg_ops;
+
+static int
+sock_connect(void)
+{
+	struct sockaddr_un sun;
+	int sock = -1, error = 0;
+
+	memset(&sun, 0, sizeof(sun));
+	sun.sun_family = PF_LOCAL;
+	snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
+
+	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+	if (sock < 0) {
+		error = errno;
+		goto fail;
+	}
+
+	error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
+	if (error < 0) {
+		error = errno;
+		close(sock);
+		errno = error;
+		sock = -1;
+		goto fail;
+	}
+
+fail:
+
+	return sock;
+}
+
+
+/**
+  Wrapper around write(2)
+ */
+static int
+sock_msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+	char buf[4096];
+	int ret;
+	local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
+	char *msgptr = (buf + sizeof(*h));
+
+	if (!ctx)
+		return -1;
+	if (!(ctx->flags & SKF_WRITE))
+		return -1;
+
+	/* encapsulate ... ? */
+	if ((len + sizeof(*h)) > sizeof(buf)) {
+		errno = E2BIG;
+		return -1;
+	}
+
+	h->msg_control = M_DATA;
+	h->msg_len = len;
+	memcpy(msgptr, msg, len);
+
+	ret = _write_retry(ctx->u.local_info.sockfd, buf,
+			    len + sizeof(*h), NULL);
+
+	if (ret >= sizeof(*h))
+		return (ret - (sizeof(*h)));
+
+	errno = EAGAIN;
+	return -1;
+}
+
+
+static int
+peekypeeky(int fd)
+{
+	local_msg_hdr_t h;
+	int ret;
+
+	while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
+		if (errno == EINTR)
+			continue;
+		return -1;
+	}
+
+	if (ret == sizeof(h))
+		return h.msg_control;
+
+	if (ret == 0)
+		/* Socket closed? */
+		return M_CLOSE;
+
+	/* XXX */
+	printf("PROTOCOL ERROR: Invalid message\n");
+	return M_CLOSE;
+}
+
+
+static int
+sock_msg_wait(msgctx_t *ctx, int timeout)
+{
+	fd_set rfds;
+	struct timeval tv = {0, 0};
+	struct timeval *p = NULL;
+
+	if (timeout >= 0) {
+		tv.tv_sec = timeout;
+		p = &tv;
+	}
+
+	FD_ZERO(&rfds);
+	FD_SET(ctx->u.local_info.sockfd, &rfds);
+
+	if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
+			  NULL, NULL, p) == 1) {
+		return peekypeeky(ctx->u.local_info.sockfd);
+	}
+
+	return M_NONE;
+}
+
+
+static int
+sock_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+	errno = EINVAL;
+	if (ctx->type != MSG_SOCKET)
+		return -1;
+	if (!fds)
+		return -1;
+
+	if (ctx->u.local_info.sockfd >= 0) {
+		FD_SET(ctx->u.local_info.sockfd, fds);
+		if (ctx->u.local_info.sockfd > *max)
+			*max = ctx->u.local_info.sockfd;
+		return 0;
+	}
+
+	return -1;
+}
+
+
+static int
+sock_msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+	if (!fds || !ctx)
+		return -1;
+	if (ctx->type != MSG_SOCKET)
+		return -1;
+
+	if (ctx->u.local_info.sockfd >= 0 &&
+	    FD_ISSET(ctx->u.local_info.sockfd, fds)) {
+		return 1;
+	}
+	return 0;
+}
+
+
+int
+sock_msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+	if (!fds || !ctx)
+		return -1;
+	if (ctx->type != MSG_SOCKET)
+		return -1;
+
+	if (ctx->u.local_info.sockfd >= 0) {
+	    	FD_CLR(ctx->u.local_info.sockfd, fds);
+		return 1;
+	}
+	return 0;
+}
+
+
+static int
+_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	struct timeval tv = {0, 0};
+	struct timeval *p = NULL;
+	local_msg_hdr_t h;
+
+	if (timeout > 0) {
+		tv.tv_sec = timeout;
+		p = &tv;
+	}
+
+	if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
+		return -1;
+
+	if (maxlen < h.msg_len) {
+		printf("WARNING: Buffer too small for message (%d vs %d)!\n",
+			h.msg_len, (int)maxlen);
+		h.msg_len = maxlen;
+	}
+
+	return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
+}
+
+
+/**
+  Receive a message from a cluster-context.  This copies out the contents
+  into the user-specified buffer, and does random other things.
+ */
+static int
+sock_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	int req;
+	char priv_msg[4096];
+	size_t priv_len = sizeof(priv_msg);
+
+	errno = EINVAL;
+	if (!msg || !maxlen)
+		return -1;
+	if (ctx->type != MSG_SOCKET)
+		return -1;
+	if (!(ctx->flags & SKF_READ))
+		return -1;
+
+	req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
+
+	if (req == 0) {
+		errno = ECONNRESET;
+		return -1;
+	}
+
+	if (req < 0)
+		return -1;
+
+	/* Copy out. */
+	priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+	memcpy(msg, priv_msg, priv_len);
+	return req;
+
+	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+	return -1;
+}
+
+
+/**
+  Open a connection to the specified node ID.
+  If the speficied node is 0, this connects via the socket in
+  /var/run/cluster...
+ */
+int
+sock_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+	errno = EINVAL;
+	if (!ctx || ctx->type != MSG_SOCKET)
+		return -1;
+	if (type != MSG_SOCKET)
+		return -1;
+
+	if (nodeid != CMAN_NODEID_US)
+		return -1;
+	if ((ctx->u.local_info.sockfd = sock_connect()) < 0)
+		return -1;
+	ctx->flags = (SKF_READ | SKF_WRITE);
+	return 0;
+}
+
+
+/**
+  With a socket, the O/S cleans up the buffers for us.
+ */
+int
+sock_msg_close(msgctx_t *ctx)
+{
+	errno = EINVAL;
+	if (ctx->type != MSG_SOCKET)
+		return -1;
+
+	close(ctx->u.local_info.sockfd);
+	ctx->u.local_info.sockfd = -1;
+	ctx->flags = 0;
+	ctx->type = MSG_NONE;
+	return 0;
+}
+
+
+/**
+  Accept a new pseudo-private connection coming in over the
+  cluster socket.
+ */
+static int
+sock_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+	errno = EINVAL;
+
+	if (!listenctx || !acceptctx)
+		return -1;
+	if (listenctx->u.local_info.sockfd < 0)
+		return -1;
+	if (!(listenctx->flags & SKF_LISTEN))
+		return -1;
+
+	listenctx->ops->mo_init(acceptctx);
+	acceptctx->u.local_info.sockfd =
+		accept(listenctx->u.local_info.sockfd, NULL, NULL);
+
+	if (acceptctx->u.local_info.sockfd < 0)
+		return -1;
+
+	acceptctx->flags = (SKF_READ | SKF_WRITE);
+	return 0;
+}
+
+
+int
+sock_msg_listen(int me, void *portp, msgctx_t **listen_ctx)
+{
+	int sock;
+	struct sockaddr_un su;
+	mode_t om;
+	msgctx_t *ctx = NULL;
+	char *path = (char *)portp;
+
+	/* Set up cluster context */
+	ctx = msg_new_ctx();
+	if (!ctx)
+		return -1;
+
+	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+	if (sock < 0)
+		return -1;
+
+	unlink(RGMGR_SOCK);
+	om = umask(077);
+	su.sun_family = PF_LOCAL;
+	snprintf(su.sun_path, sizeof(su.sun_path), path);
+
+	if (bind(sock, &su, sizeof(su)) < 0) {
+		umask(om);
+		goto fail;
+	}
+	umask(om);
+
+	if (listen(sock, SOMAXCONN) < 0)
+		goto fail;
+
+	ctx->type = MSG_SOCKET;
+	ctx->u.local_info.sockfd = sock;
+	ctx->flags = SKF_LISTEN;
+	ctx->ops = &sock_msg_ops;
+	*listen_ctx = ctx;
+	return 0;
+fail:
+	if (ctx)
+		free(ctx);
+	if (sock >= 0)
+		close(sock);
+	return -1;
+}
+
+
+/* XXX INCOMPLETE - no local_ctx setup*/
+int
+sock_msg_init(msgctx_t *ctx)
+{
+	memset(ctx,0,sizeof(*ctx));
+	ctx->type = MSG_SOCKET;
+	ctx->u.local_info.sockfd = -1;
+	ctx->ops = &sock_msg_ops;
+	return 0;
+}
+
+
+void
+sock_msg_print(msgctx_t *ctx)
+{
+	printf("Socket Message Context; fd = %d\n", ctx->u.local_info.sockfd);
+}
+
+
+/* XXX INCOMPLETE */
+int
+sock_msg_shutdown(void)
+{
+	return 0;
+}
+
+
+static msg_ops_t sock_msg_ops = {
+	.mo_open = sock_msg_open,
+	.mo_close = sock_msg_close,
+	.mo_listen = sock_msg_listen,
+	.mo_accept = sock_msg_accept,
+	.mo_shutdown = sock_msg_shutdown,
+	.mo_wait = sock_msg_wait,
+	.mo_send = sock_msg_send,
+	.mo_receive = sock_msg_receive,
+	.mo_fd_set = sock_msg_fd_set,
+	.mo_fd_isset = sock_msg_fd_isset,
+	.mo_fd_clr = sock_msg_fd_clr,
+	.mo_print = sock_msg_print,
+	.mo_init = sock_msg_init
+};
/cvs/cluster/cluster/rgmanager/src/clulib/msgtest.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msgtest.c
+++ -	2006-07-11 23:52:43.995452000 +0000
@@ -0,0 +1,286 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
+#include <message.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/select.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <cman-private.h>
+
+#define MYPORT 190
+
+int my_node_id = 0;
+int running = 1;
+
+
+void
+sighandler(int sig)
+{
+	running = 0;
+}
+
+
+void *
+piggyback(void *arg)
+{
+	msgctx_t ctx;
+	char buf[4096];
+
+	if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 0) != 0) {
+		printf("Could not set up mcast socket!\n");
+		return NULL;
+	}
+
+	printf("PIGGYBACK CONTEXT\n");
+	msg_print(&ctx);
+	printf("END PIGGYBACK CONTEXT\n");
+	
+	while (running) {
+		if (msg_receive(&ctx, buf, sizeof(buf), 2) > 0) {
+			printf("Piggyback received: %s\n", buf);
+		}
+	}
+
+	msg_close(&ctx);
+
+	printf("PIGGY flies...\n");
+
+	return NULL;
+}
+
+
+void *
+private(void *arg)
+{
+	msgctx_t ctx;
+	char buf[4096];
+
+	while (running) {
+		sleep(3);
+
+		/* use pseudoprivate channel */
+		if (msg_open(MSG_CLUSTER, my_node_id, MYPORT, &ctx, 1) != 0) {
+			printf("Could not set up virtual-socket!\n");
+			return NULL;
+		}
+
+		printf("=== pvt thread channel info ===\n");
+		msg_print(&ctx);
+		printf("=== end pvt thread channel info ===\n");
+		fflush(stdout);
+
+		snprintf(buf, sizeof(buf), "Hello!\n");
+		msg_send(&ctx, buf, strlen(buf)+1);
+
+		if (msg_receive(&ctx, buf, sizeof(buf), 10) > 0) {
+			printf("PRIVATE: Received %s\n", buf);
+			fflush(stdout);
+		}
+
+		msg_close(&ctx);
+
+		if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 1) != 0) {
+			printf("Could not set up mcast socket!\n");
+			return NULL;
+		}
+
+		snprintf(buf, sizeof(buf), "Babble, babble\n");
+		msg_send(&ctx, buf, strlen(buf)+1);
+		if (msg_receive(&ctx, buf, sizeof(buf), 1) > 0) {
+			printf("PRIVATE: Via MCAST %s\n", buf);
+			fflush(stdout);
+		}
+		msg_close(&ctx);
+	}
+
+	printf("Private thread is outta here...\n");
+
+	return NULL;
+}
+
+
+void
+clu_initialize(cman_handle_t **ch)
+{
+	if (!ch)
+		exit(1);
+
+	*ch = cman_init(NULL);
+	if (!(*ch)) {
+		printf("Waiting for CMAN to start\n");
+
+		while (!(*ch = cman_init(NULL))) {
+			sleep(1);
+		}
+	}
+
+        if (!cman_is_quorate(*ch)) {
+		/*
+		   There are two ways to do this; this happens to be the simpler
+		   of the two.  The other method is to join with a NULL group 
+		   and log in -- this will cause the plugin to not select any
+		   node group (if any exist).
+		 */
+		printf("Waiting for quorum to form\n");
+
+		while (cman_is_quorate(*ch) == 0) {
+			sleep(1);
+		}
+		printf("Quorum formed, starting\n");
+	}
+}
+
+
+int
+side_message(msgctx_t *ctx)
+{
+	msgctx_t actx;
+	char buf[1024];
+
+	if (msg_accept(ctx, &actx) < 0)
+		return -1;
+
+	printf("=== MAIN: Handling side message ===\n");
+	msg_print(&actx);
+	fflush(stdout);
+
+	if (msg_receive(&actx, buf, sizeof(buf), 10) > 0) {
+		printf("MAIN: Received %s\n", buf);
+		snprintf(buf, sizeof(buf), "Goodbye!\n");
+		msg_send(&actx, buf, strlen(buf)+1);
+	}
+
+	msg_close(&actx);
+	
+	printf("=== MAIN: end side message ===\n");
+
+	return 0;
+}
+
+
+void
+malloc_dump_table(int, int);
+
+
+void
+sigusr2_handler(int sig)
+{
+}
+
+int
+main(int argc, char **argv)
+{
+	msgctx_t *cluster_ctx;
+	char recvbuf[128];
+	cman_node_t me;
+	int ret;
+	pthread_t piggy, priv;
+	fd_set rfds;
+	int max = 0;
+	int port = MYPORT;
+	cman_handle_t *clu = NULL;
+
+
+	clu_initialize(&clu);
+
+	if (cman_init_subsys(clu) < 0) {
+		perror("cman_init_subsys");
+		return -1;
+	}
+        cman_get_node(clu, CMAN_NODEID_US, &me);
+
+	my_node_id = me.cn_nodeid;
+
+	if (msg_listen(MSG_CLUSTER, (void *)&port,
+	    me.cn_nodeid, &cluster_ctx) < 0) {
+		printf("Couldn't set up cluster message system\n");
+		return -1;
+	}
+
+	signal(SIGTERM, sigusr2_handler);
+	signal(SIGUSR2, sigusr2_handler);
+
+	pthread_create(&piggy, NULL, piggyback, NULL);
+	pthread_create(&priv, NULL, private, NULL);
+
+	msg_print(cluster_ctx);
+	while (running) {
+		max = 0;
+		FD_ZERO(&rfds);
+		FD_SET(STDIN_FILENO, &rfds);
+		msg_fd_set(cluster_ctx, &rfds, &max);
+
+		select(max+1, &rfds, NULL, NULL, NULL);
+
+		if (FD_ISSET(STDIN_FILENO, &rfds)) {
+			fgets(recvbuf, 128, stdin);
+			if (recvbuf[0] == 'q' || recvbuf[0] == 'Q')
+				break;
+			if (msg_send(cluster_ctx, recvbuf,
+			    strlen(recvbuf)+1) < 0)
+				perror("msg_send");
+			FD_CLR(STDIN_FILENO, &rfds);
+		}
+		
+		if (!msg_fd_isset(cluster_ctx, &rfds)) 
+			continue;
+
+		ret = msg_wait(cluster_ctx, 1);
+	
+		switch(ret) {
+		case M_DATA:
+			msg_receive(cluster_ctx, recvbuf, 128, 10);
+			printf("MAIN: received %s\n", recvbuf);
+			break;
+		case M_OPEN:
+			printf("MAIN: private connection detected\n");
+			side_message(cluster_ctx);
+			break;
+		case 0:	
+			/* No data; probably a control msg */
+			break;
+		default:
+			printf("Cluster EV: %d\n", ret);
+			/* Cluster events, etc. */
+			msg_receive(cluster_ctx, recvbuf, 128, 0);
+		}
+	}
+
+	printf("Shutting down...\n");
+
+	running = 0;
+
+	pthread_join(piggy, NULL);
+	pthread_join(priv, NULL);
+
+	msg_close(cluster_ctx);
+	msg_free_ctx(cluster_ctx);
+	msg_shutdown();	
+
+	cman_finish(clu);
+
+	malloc_dump_table(0, 1024);
+
+	exit(0);
+}
--- cluster/rgmanager/src/clulib/Makefile	2006/06/02 17:37:10	1.8
+++ cluster/rgmanager/src/clulib/Makefile	2006/07/11 23:52:41	1.9
@@ -21,7 +21,7 @@
 CFLAGS+= -g -Wstrict-prototypes -Wshadow -fPIC -D_GNU_SOURCE
 
 
-TARGETS=libclulib.a liblalloc.a
+TARGETS=libclulib.a liblalloc.a msgtest 
 
 all: ${TARGETS}
 
@@ -29,9 +29,12 @@
 
 uninstall:
 
+msgtest: msgtest.o libclulib.a
+	gcc -o msgtest msgtest.o -lcman -L. -lclulib -llalloc -lpthread
+
 libclulib.a: clulog.o daemon_init.o signals.o msgsimple.o \
-		gettid.o rg_strings.o message.o members.o fdops.o
-		# lock.o lockspace.o 
+		gettid.o rg_strings.o message.o members.o fdops.o \
+		lock.o cman.o vft.o msg_cluster.o msg_socket.o
 	${AR} cru $@ $^
 	ranlib $@
 
--- cluster/rgmanager/src/clulib/alloc.c	2006/01/20 16:27:24	1.7
+++ cluster/rgmanager/src/clulib/alloc.c	2006/07/11 23:52:41	1.8
@@ -129,8 +129,9 @@
 #undef  AGGR_RECLAIM		/* consolidate_all on free (*slow*) */
 
 #undef  STACKSIZE	/*4	   backtrace to store if DEBUG is set */
+//#define STACKSIZE 4
 
-#undef  GDB_HOOK		/* Dump program addresses in malloc_table
+#undef	GDB_HOOK		/* Dump program addresses in malloc_table
 				   using a fork/exec of gdb (SLOW but fun)
 				   building this defeats the purpose of
 				   a bounded memory allocator, and is only
--- cluster/rgmanager/src/clulib/lock.c	2006/06/13 19:22:38	1.1
+++ cluster/rgmanager/src/clulib/lock.c	2006/07/11 23:52:41	1.2
@@ -31,6 +31,10 @@
 #include <sys/select.h>
 #include <pthread.h>
 
+/* Default lockspace stuff */
+static dlm_lshandle_t _default_ls = NULL;
+static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER;
+
 
 static void
 ast_function(void * __attribute__ ((unused)) arg)
@@ -39,7 +43,7 @@
 
 
 static int
-wait_for_dlm_event(dlm_lshandle_t *ls)
+wait_for_dlm_event(dlm_lshandle_t ls)
 {
 	fd_set rfds;
 	int fd = dlm_ls_get_fd(ls);
@@ -55,15 +59,18 @@
 
 
 int
-clu_lock(dlm_lshandle_t ls,
-	 int mode,
-	 struct dlm_lksb *lksb,
-	 int options,
-         char *resource)
+clu_ls_lock(dlm_lshandle_t ls,
+	    int mode,
+	    struct dlm_lksb *lksb,
+	    int options,
+            char *resource)
 {
         int ret;
 
 	if (!ls || !lksb || !resource || !strlen(resource)) {
+		printf("%p %p %p %d\n", ls, lksb, resource,
+		       (int)strlen(resource));
+		printf("INVAL...\n");
 		errno = EINVAL;
 		return -1;
 	}
@@ -91,15 +98,22 @@
 
 
 dlm_lshandle_t
-clu_acquire_lockspace(const char *lsname)
+clu_open_lockspace(const char *lsname)
 {
         dlm_lshandle_t ls = NULL;
 
+	//printf("opening lockspace %s\n", lsname);
+
         while (!ls) {
                 ls = dlm_open_lockspace(lsname);
                 if (ls)
                         break;
 
+		/*
+		printf("Failed to open: %s; trying create.\n",
+		       strerror(errno));
+		 */
+
                 ls = dlm_create_lockspace(lsname, 0644);
                 if (ls)
                         break;
@@ -119,9 +133,8 @@
 }
 
 
-
 int
-clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
+clu_ls_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
 {
         int ret;
 
@@ -147,7 +160,146 @@
 
 
 int
-clu_release_lockspace(dlm_lshandle_t ls, char *name)
+clu_close_lockspace(dlm_lshandle_t ls, const char *name)
 {
         return dlm_release_lockspace(name, ls, 0);
 }
+
+
+int
+clu_lock(int mode,
+	 struct dlm_lksb *lksb,
+	 int options,
+         char *resource)
+{
+	int ret = 0, block = 0, conv = 0, err;
+
+	block = !(options & LKF_NOQUEUE);
+
+	/*
+	   Try to use a conversion lock mechanism when possible
+	   If the caller calls explicitly with a NULL lock, then
+	   assume the caller knows what it is doing.
+
+	   Only take the NULL lock if:
+	   (a) the user isn't specifying CONVERT; if they are, they
+	       know what they are doing.
+
+	   ...and one of...
+
+	   (b) This is a blocking call, or
+	   (c) The user requested a NULL lock explicitly.  In this case,
+	       short-out early; there's no reason to convert a NULL lock
+	       to a NULL lock.
+	 */
+	if (!(options & LKF_CONVERT) &&
+	    (block || (mode == LKM_NLMODE))) {
+		/* Acquire NULL lock */
+		pthread_mutex_lock(&_default_lock);
+		ret = clu_ls_lock(_default_ls, LKM_NLMODE, lksb,
+				  (options & ~LKF_NOQUEUE),
+				  resource);
+		err = errno;
+		pthread_mutex_unlock(&_default_lock);
+		if (ret == 0) {
+			if (mode == LKM_NLMODE) {
+				/* User only wanted a NULL lock... */
+				return 0;
+			}
+			/*
+			   Ok, NULL lock was taken, rest of blocking
+			   call should be done using lock conversions.
+			 */
+			options |= LKF_CONVERT;
+			conv = 1;
+		} else {
+			switch(err) {
+			case EINVAL:
+				/* Oops, null locks don't work on this
+				   plugin; use normal spam mode */
+				break;
+			default:
+				errno = err;
+				return -1;
+			}
+		}
+	}
+
+	while (1) {
+		pthread_mutex_lock(&_default_lock);
+		ret = clu_ls_lock(_default_ls, mode, lksb,
+				  (options | LKF_NOQUEUE),
+				  resource);
+		err = errno;
+		pthread_mutex_unlock(&_default_lock);
+
+		if ((ret != 0) && (err == EAGAIN) && block) {
+			usleep(random()&32767);
+			continue;
+		}
+
+		break;
+	}
+
+	if (ret != 0 && conv) {
+		/* If we get some other error, release the NL lock we
+		 took so we don't leak locks*/
+		pthread_mutex_lock(&_default_lock);
+		clu_ls_unlock(_default_ls, lksb);
+		pthread_mutex_unlock(&_default_lock);
+		errno = err;
+	}
+
+	return ret;
+}
+
+
+int
+clu_unlock(struct dlm_lksb *lksb)
+{
+	int ret, err;
+	pthread_mutex_lock(&_default_lock);
+	ret = clu_ls_unlock(_default_ls, lksb);
+	err = errno;
+	pthread_mutex_unlock(&_default_lock);
+
+	usleep(random()&32767);
+	errno = err;
+	return ret;
+}
+
+
+int
+clu_lock_init(const char *dflt_lsname)
+{
+	int ret, err;
+
+	pthread_mutex_lock(&_default_lock);
+	if (_default_ls) {
+		pthread_mutex_unlock(&_default_lock);
+		return 0;
+	}
+
+	if (!dflt_lsname || !strlen(dflt_lsname)) {
+		pthread_mutex_unlock(&_default_lock);
+		errno = EINVAL;
+		return -1;
+	}
+
+	_default_ls = clu_open_lockspace(dflt_lsname);
+	ret = (_default_ls == NULL);
+	err = errno;
+	pthread_mutex_unlock(&_default_lock);
+
+	errno = err;
+	return ret;
+}
+
+void
+clu_lock_finished(const char *name)
+{
+	pthread_mutex_lock(&_default_lock);
+	if (_default_ls)
+		clu_close_lockspace(_default_ls, name);
+	pthread_mutex_unlock(&_default_lock);
+}
--- cluster/rgmanager/src/clulib/lockspace.c	2006/06/13 19:22:38	1.1
+++ cluster/rgmanager/src/clulib/lockspace.c	2006/07/11 23:52:41	1.2
@@ -1,3 +1,21 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
--- cluster/rgmanager/src/clulib/members.c	2006/06/13 19:22:38	1.1
+++ cluster/rgmanager/src/clulib/members.c	2006/07/11 23:52:41	1.2
@@ -1,3 +1,21 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
 #include <sys/types.h>
 #include <arpa/inet.h>
 #include <stdint.h>
@@ -8,10 +26,12 @@
 #include <members.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <signal.h>
 #include <string.h>
 #include <sys/socket.h>
 #include <rg_types.h>
 #include <pthread.h>
+#include <errno.h>
 
 static int my_node_id = -1;
 static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER;
@@ -73,29 +93,32 @@
 {
 	int count, x, y;
 	char in_old = 0;
-	cluster_member_list_t *gained;
+	cluster_member_list_t *gained = NULL;
 
 	/* No nodes in new?  Then nothing could have been gained */
 	if (!new || !new->cml_count)
 		return NULL;
 
 	/* Nothing in old?  Duplicate 'new' and return it. */
-	if (!old || !old->cml_count) {
-		gained = cml_alloc(new->cml_count);
-		if (!gained)
-			return NULL;
-		memcpy(gained, new, cml_size(new->cml_count));
-		return gained;
-	}
+	if (!old || !old->cml_count)
+		return member_list_dup(new);
 
 	/* Use greatest possible count */
 	count = (old->cml_count > new->cml_count ?
-		 cml_size(old->cml_count) : cml_size(new->cml_count));
+		 old->cml_count : new->cml_count);
+	count *= sizeof(cman_node_t);
 
-	gained = malloc(count);
+	gained = malloc(sizeof(cluster_member_list_t));
 	if (!gained)
 		return NULL;
-	memset(gained, 0, count);
+	memset(gained, 0, sizeof(*gained));
+
+	gained->cml_members = malloc(count);
+	if (!gained->cml_members) {
+		free(gained);
+		return NULL;
+	}
+	memset(gained->cml_members, 0, count);
 
 	for (x = 0; x < new->cml_count; x++) {
 
@@ -121,6 +144,7 @@
 	}
 
 	if (gained->cml_count == 0) {
+		free(gained->cml_members);
 		free(gained);
 		gained = NULL;
 	}
@@ -145,7 +169,7 @@
 cluster_member_list_t *
 memb_lost(cluster_member_list_t *old, cluster_member_list_t *new)
 {
-	cluster_member_list_t *ret;
+	cluster_member_list_t *ret = NULL;
 	int x;
 
 	/* Reverse. ;) */
@@ -213,26 +237,42 @@
 get_member_list(cman_handle_t h)
 {
 	int c;
+	int tries = 0, local = 0;
 	cluster_member_list_t *ml = NULL;
 	cman_node_t *nodes = NULL;
 
-	do {
+	if (h == NULL) {
+		local = 1;
+		h = cman_init(NULL);
+		if (!h)
+			return NULL;
+	}
+
+	do {	
+		++tries;
 		if (nodes)
 			free(nodes);
 
 		c = cman_get_node_count(h);
-		if (c <= 0)
-			return NULL;
+		if (c <= 0) {
+			if (errno == EINTR)
+				continue;
+			if (ml)
+				free(ml);
+			ml = NULL;
+			goto out;
+		}
 
 		if (!ml)
 			ml = malloc(sizeof(*ml));
 		if (!ml)
-			return NULL;
+			goto out;
 
 		nodes = malloc(sizeof(*nodes) * c);
 		if (!nodes) {
 			free(ml);
-			return NULL;
+			ml = NULL;
+			goto out;
 		}
 
 		memset(ml, 0, sizeof(*ml));
@@ -244,6 +284,10 @@
 
 	ml->cml_members = nodes;
 	ml->cml_count = c;
+
+out:
+	if (local)
+		cman_finish(h);
 	return ml;
 }
 
@@ -264,6 +308,9 @@
 {
 	int x = 0;
 
+	if (!ml)
+		return 0;
+
 	for (x = 0; x < ml->cml_count; x++) {
 		if (ml->cml_members[x].cn_nodeid == nodeid)
 			return ml->cml_members[x].cn_member;
@@ -278,6 +325,9 @@
 {
 	int x = 0, count = 0;
 
+	if (!ml)
+		return 0;
+
 	for (x = 0; x < ml->cml_count; x++) {
 		if (ml->cml_members[x].cn_member)
 			++count;
@@ -292,6 +342,9 @@
 {
 	int x = 0;
 
+	if (!ml)
+		return -1;
+
 	for (x = 0; x < ml->cml_count; x++) {
 		if (ml->cml_members[x].cn_nodeid == nodeid)
 			ml->cml_members[x].cn_member = 0;
@@ -306,13 +359,15 @@
 memb_id_to_name(cluster_member_list_t *ml, int nodeid)
 {
 	int x = 0;
+	if (!ml)
+		return NULL;
 
 	for (x = 0; x < ml->cml_count; x++) {
 		if (ml->cml_members[x].cn_nodeid == nodeid)
 			return ml->cml_members[x].cn_name;
 	}
 
-	return 0;
+	return NULL;
 }
 
 
@@ -320,13 +375,15 @@
 memb_id_to_p(cluster_member_list_t *ml, int nodeid)
 {
 	int x = 0;
+	if (!ml)
+		return NULL;
 
 	for (x = 0; x < ml->cml_count; x++) {
 		if (ml->cml_members[x].cn_nodeid == nodeid)
 			return &ml->cml_members[x];
 	}
 
-	return 0;
+	return NULL;
 }
 
 
@@ -334,6 +391,8 @@
 memb_online_name(cluster_member_list_t *ml, char *name)
 {
 	int x = 0;
+	if (!ml)
+		return 0;
 
 	for (x = 0; x < ml->cml_count; x++) {
 		if (!strcasecmp(ml->cml_members[x].cn_name, name))
@@ -348,6 +407,8 @@
 memb_name_to_id(cluster_member_list_t *ml, char *name)
 {
 	int x = 0;
+	if (!ml)
+		return 0;
 
 	for (x = 0; x < ml->cml_count; x++) {
 		if (!strcasecmp(ml->cml_members[x].cn_name, name))
@@ -362,13 +423,15 @@
 memb_name_to_p(cluster_member_list_t *ml, char *name)
 {
 	int x = 0;
+	if (!ml)
+		return NULL;
 
 	for (x = 0; x < ml->cml_count; x++) {
 		if (!strcasecmp(ml->cml_members[x].cn_name, name))
 			return &ml->cml_members[x];
 	}
 
-	return 0;
+	return NULL;
 }
 
 /**
@@ -388,9 +451,19 @@
 	if (!orig)
 		return NULL;
 
-	ret = malloc(cml_size(orig->cml_count));
-	memset(ret, 0, cml_size(orig->cml_count));
-	memcpy(ret, orig, cml_size(orig->cml_count));
+	ret = malloc(sizeof(cluster_member_list_t));
+	if (!ret)
+		return NULL;
+	memset(ret, 0, sizeof(cluster_member_list_t));
+	ret->cml_members = malloc(sizeof(cman_node_t) * orig->cml_count);
+
+	if (!ret->cml_members) {
+		free(ret);
+		return NULL;
+	}
+	ret->cml_count = orig->cml_count;
+	memcpy(ret->cml_members, orig->cml_members,
+	       orig->cml_count * sizeof(cman_node_t));
 
 	return ret;
 }
--- cluster/rgmanager/src/clulib/message.c	2006/06/13 19:22:38	1.1
+++ cluster/rgmanager/src/clulib/message.c	2006/07/11 23:52:41	1.2
@@ -1,138 +1,41 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
 #define _MESSAGE_BUILD
 #include <message.h>
 #include <stdio.h>
-#include <pthread.h>
-#include <libcman.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
 #include <sys/types.h>
 #include <stdlib.h>
-#include <rg_types.h>
-#include <sys/socket.h>
 #include <sys/stat.h>
-#include <sys/un.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
 #include <errno.h>
-#include <sys/time.h>
-#include <fdops.h>
-#include <resgroup.h>
-
-
-
-/* Ripped from ccsd's setup_local_socket */
-#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
-#define MAX_CONTEXTS 32  /* Testing; production should be 1024-ish */
-
-/* Context 0 is reserved for control messages */
-
-/* Local-ish contexts */
-static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
-static msgctx_t *contexts[MAX_CONTEXTS];
-static uint32_t context_index = 1;
-static chandle_t *gch;
-pthread_t comms_thread;
-int thread_running;
-
-
-#define is_established(ctx) \
-	(((ctx->type == MSG_CLUSTER) && \
- 	  (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \
-	 ((ctx->type == MSG_SOCKET) && \
-	  (ctx->u.local_info.sockfd != -1)))
-
-
-static int
-local_connect(void)
-{
-	struct sockaddr_un sun;
-	int sock = -1, error = 0;
-
-	memset(&sun, 0, sizeof(sun));
-	sun.sun_family = PF_LOCAL;
-	snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
-
-	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
-	if (sock < 0) {
-		error = errno;
-		goto fail;
-	}
-
-	error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
-	if (error < 0) {
-		error = errno;
-		goto fail;
-	}
-
-	sock = error;
-fail:
-
-	return sock;
-}
-
-
-static int
-send_cluster_message(msgctx_t *ctx, void *msg, size_t len)
-{
-	char buf[4096];
-	cluster_msg_hdr_t *h = (void *)buf;
-	int ret;
-	char *msgptr = (buf + sizeof(*h));
-
-	if ((len + sizeof(*h)) > sizeof(buf)) {
-		errno = E2BIG;
-		return -1;
-	}
-
-	h->msg_control = M_DATA;
-	h->src_ctx = ctx->u.cluster_info.local_ctx;
-	h->dest_ctx = ctx->u.cluster_info.remote_ctx;
-	h->msg_port = ctx->u.cluster_info.port;
-	memcpy(msgptr, msg, len);
 
-	/*
-	printf("sending cluster message, length = %d to nodeid %d port %d\n",
-	       len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
-	 */
-
-	pthread_mutex_lock(&gch->c_lock);
-	h->src_nodeid = gch->c_nodeid;
-
-	swab_cluster_msg_hdr_t(h);
-
-	ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h),
-			       ctx->u.cluster_info.nodeid,
-			       ctx->u.cluster_info.port, 0);
-
-	pthread_mutex_unlock(&gch->c_lock);
-
-	return len + sizeof(h);
-}
-
-
-/**
-  Wrapper around write(2)
- */
-static int
-send_socket_message(msgctx_t *ctx, void *msg, size_t len)
-{
-	char buf[4096];
-	local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
-	char *msgptr = (buf + sizeof(*h));
-
-	/* encapsulate ... ? */
-	if ((len + sizeof(*h)) > sizeof(buf)) {
-		errno = E2BIG;
-		return -1;
-	}
-
-	h->msg_control = M_DATA;
-	h->msg_len = len;
-	memcpy(msgptr, msg, len);
-
-	return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL);
-}
+/* From msg_cluster */
+int cluster_msg_init(msgctx_t *ctx);
+int cluster_msg_listen(int me, void *, msgctx_t **ctx);
+int cluster_msg_shutdown(void);
+
+/* From msg_socket  */
+int sock_msg_init(msgctx_t *ctx);
+int sock_msg_listen(int me, void *, msgctx_t **ctx);
+int sock_msg_shutdown(void);
 
 
 /**
@@ -142,238 +45,22 @@
 int
 msg_send(msgctx_t *ctx, void *msg, size_t len)
 {
-	if (!ctx || !msg || !len) {
-		errno = EINVAL;
-		return -1;
-	}
-
-	switch(ctx->type) {
-	case MSG_CLUSTER:
-		return send_cluster_message(ctx, msg, len);
-	case MSG_SOCKET:
-		return send_socket_message(ctx, msg, len);
-	default:
-		break;
-	}
-
 	errno = EINVAL;
-	return -1;
-}
-
-
-/**
-  Assign a (free) cluster context ID
- */
-static int
-assign_ctx(msgctx_t *ctx)
-{
-	int start;
-
-	/* Assign context index */
-	ctx->type = MSG_CLUSTER;
-
-	pthread_mutex_lock(&context_lock);
-	start = context_index++;
-	if (context_index >= MAX_CONTEXTS || context_index <= 0)
-		context_index = 1;
-	do {
-		if (contexts[context_index]) {
-			++context_index;
-			if (context_index >= MAX_CONTEXTS)
-				context_index = 1;
-
-			if (context_index == start) {
-				pthread_mutex_unlock(&context_lock);
-				errno = EAGAIN;
-				return -1;
-			}
-
-			continue;
-		}
-
-		contexts[context_index] = ctx;
-		ctx->u.cluster_info.local_ctx = context_index;
-
-	} while (0);
-	pthread_mutex_unlock(&context_lock);
-
-	return 0;
-}
-
-
-/* See if anything's on the cluster socket.  If so, dispatch it
-   on to the requisite queues
-   XXX should be passed a connection arg! */
-static int
-poll_cluster_messages(int timeout)
-{
-	int ret = -1;
-	fd_set rfds;
-	int fd;
-	struct timeval tv;
-	struct timeval *p = NULL;
-
-	if (timeout >= 0) {
-		p = &tv;
-		tv.tv_sec = tv.tv_usec = timeout;
-	}
-	printf("%s\n", __FUNCTION__);
-
-	FD_ZERO(&rfds);
-
-	//pthread_mutex_lock(&gch->c_lock);
-	fd = cman_get_fd(gch->c_cluster);
-	FD_SET(fd, &rfds);
-
-	if (select(fd + 1, &rfds, NULL, NULL, p) == 1) {
-		cman_dispatch(gch->c_cluster, 0);
-		ret = 0;
-	}
-	//pthread_mutex_unlock(&gch->c_lock);
-
-	return ret;
-}
-
-
-/**
-  This is used to establish and tear down pseudo-private
-  contexts which are shared with the cluster context.
- */
-static int
-cluster_send_control_msg(msgctx_t *ctx, int type)
-{
-	cluster_msg_hdr_t cm;
-
-	cm.msg_control = (uint8_t)type;
-	cm.src_nodeid = gch->c_nodeid;
-	cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
-	cm.src_ctx = ctx->u.cluster_info.local_ctx;
-	cm.msg_port = ctx->u.cluster_info.port;
-
-	swab_cluster_msg_hdr_t(&cm);
-
-	return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm),
-			       ctx->u.cluster_info.nodeid,
-			       ctx->u.cluster_info.port, 0));
-}
-
-
-/**
-  Wait for a message on a context.
- */
-static int
-cluster_msg_wait(msgctx_t *ctx, int timeout)
-{
-	struct timespec ts = {0, 0};
-	int req = M_NONE;
-	struct timeval start;
-	struct timeval now;
-	
-
-	if (timeout > 0)
-		gettimeofday(&start, NULL);
-
-	ts.tv_sec = !!timeout;
-
-	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-	while (1) {
-		/* See if we dispatched any messages on to our queue */
-		if (ctx->u.cluster_info.queue) {
-			req = ctx->u.cluster_info.queue->message->msg_control;
-			/*printf("Queue not empty CTX%d : %d\n",
-			  	 ctx->u.cluster_info.local_ctx, req);*/
-			break;
-		}
-
-		if (timeout == 0)
-			break;
-
-		/* Ok, someone else has the mutex on our FD.  Go to
-	   	   sleep on a cond; maybe they'll wake us up */
-		if (pthread_cond_timedwait(&ctx->u.cluster_info.cond,
-		    			   &ctx->u.cluster_info.mutex,
-		   			   &ts) < 0) {
-
-			/* Mutex held */
-			if (errno == ETIMEDOUT) {
-				if (timeout < 0) {
-					ts.tv_sec = 1;
-					ts.tv_nsec = 0;
-					continue;
-				} 
-
-				ts.tv_sec = !!timeout;
-
-				/* Done */
-				break;
-			}
-		}
-
-		if (timeout > 0) {
-			gettimeofday(&now, NULL);
-			/* XXX imprecise */
-			if (now.tv_sec - start.tv_sec > timeout)
-				break;
-		}
-	}
-	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
-	return req;
-}
-
-
-static int
-peekypeeky(int fd)
-{
-	local_msg_hdr_t h;
-	int ret;
-
-	while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
-		if (errno == EINTR)
-			continue;
+	if (!ctx || !msg || !len)
 		return -1;
-	}
-
-	if (ret == sizeof(h))
-		return h.msg_control;
-
-	if (ret == 0)
-		/* Socket closed? */
-		return M_CLOSE;
 
-	/* XXX */
-	printf("PROTOCOL ERROR: Invalid message\n");
-	return M_CLOSE;
-}
-
-
-static int
-local_msg_wait(msgctx_t *ctx, int timeout)
-{
-	fd_set rfds;
-	struct timeval tv = {0, 0};
-	struct timeval *p = NULL;
-
-	if (timeout >= 0) {
-		tv.tv_sec = timeout;
-		p = &tv;
-	}
-
-	FD_ZERO(&rfds);
-	FD_SET(ctx->u.local_info.sockfd, &rfds);
-
-	if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
-			  NULL, NULL, p) == 1) {
-		return peekypeeky(ctx->u.local_info.sockfd);
-	}
-
-	return M_NONE;
+	if (ctx->ops && ctx->ops->mo_send)
+		return ctx->ops->mo_send(ctx, msg, len);
+	errno = ENOSYS;
+	return -1;
 }
 
 
+/* XXX get API for this ready */
 int
 msg_get_nodeid(msgctx_t *ctx)
 {
+	/* XXX */
 	switch(ctx->type) {
 	case MSG_CLUSTER:
 		return ctx->u.cluster_info.nodeid;
@@ -390,49 +77,13 @@
 int
 msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
 {
-	int e;
-	switch(ctx->type) {
-	case MSG_CLUSTER:
-		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-		if (ctx->u.cluster_info.select_pipe[0] < 0) {
-			if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
-				e = errno;
-				pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-				errno = e;
-				return -1;
-			}
-
-			printf("%s: Created cluster CTX select pipe "
-			       "rd=%d wr=%d\n", __FUNCTION__,
-			       ctx->u.cluster_info.select_pipe[0],
-			       ctx->u.cluster_info.select_pipe[1]);
-
-		}
-
-		e = ctx->u.cluster_info.select_pipe[0];
-		printf("%s: cluster %d\n", __FUNCTION__,  e);
-		FD_SET(e, fds);
-		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
-		if (e > *max)
-			*max = e;
-		return 0;
-
-	case MSG_SOCKET:
-		if (ctx->u.local_info.sockfd >= 0) {
-			printf("%s: local %d\n", __FUNCTION__,
-			       ctx->u.local_info.sockfd);
-			FD_SET(ctx->u.local_info.sockfd, fds);
-
-			if (ctx->u.local_info.sockfd > *max)
-				*max = ctx->u.local_info.sockfd;
-			return 0;
-		}
+	errno = EINVAL;
+	if (!ctx)
 		return -1;
-	default:
-		break;
-	}
-
+	
+	if (ctx->ops && ctx->ops->mo_fd_set)
+		return ctx->ops->mo_fd_set(ctx, fds, max);
+	errno = ENOSYS;
 	return -1;
 }
 
@@ -441,30 +92,12 @@
 msg_fd_isset(msgctx_t *ctx, fd_set *fds)
 {
 	errno = EINVAL;
-
-	if (!fds || !ctx)
+	if (!ctx)
 		return -1;
-
-	switch(ctx->type) {
-	case MSG_CLUSTER:
-		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-		if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
-		    FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
-			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-			return 1;
-		}
-		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-		return 0;
-	case MSG_SOCKET:
-		if (ctx->u.local_info.sockfd >= 0 &&
-		    FD_ISSET(ctx->u.local_info.sockfd, fds)) {
-			return 1;
-		}
-		return 0;
-	default:
-		break;
-	}
-
+	
+	if (ctx->ops && ctx->ops->mo_fd_isset)
+		return ctx->ops->mo_fd_isset(ctx, fds);
+	errno = ENOSYS;
 	return -1;
 }
 
@@ -473,30 +106,12 @@
 msg_fd_clr(msgctx_t *ctx, fd_set *fds)
 {
 	errno = EINVAL;
-
-	if (!fds || !ctx)
+	if (!ctx)
 		return -1;
-
-	switch(ctx->type) {
-	case MSG_CLUSTER:
-		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-		if (ctx->u.cluster_info.select_pipe[0] >= 0) {
-		    	FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
-			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-			return 1;
-		}
-		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-		return 0;
-	case MSG_SOCKET:
-		if (ctx->u.local_info.sockfd >= 0) {
-		    	FD_CLR(ctx->u.local_info.sockfd, fds);
-			return 1;
-		}
-		return 0;
-	default:
-		break;
-	}
-
+	
+	if (ctx->ops && ctx->ops->mo_fd_clr)
+		return ctx->ops->mo_fd_clr(ctx, fds);
+	errno = ENOSYS;
 	return -1;
 }
 
@@ -519,232 +134,28 @@
 int
 msg_wait(msgctx_t *ctx, int timeout)
 {
-
-	if (!ctx) {
-		errno = EINVAL;
-		return -1;
-	}
-		
-	switch(ctx->type) {
-	case MSG_CLUSTER:
-		return cluster_msg_wait(ctx, timeout);
-	case MSG_SOCKET:
-		return local_msg_wait(ctx, timeout);
-	default:
-		break;
-	}
-
 	errno = EINVAL;
-	return -1;
-}
-
-
-int
-_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
-{
-	cluster_msg_hdr_t *m;
-	msg_q_t *n;
-	int ret = 0;
-
-	if (msg)
-		*msg = NULL;
-	if (len)
-		*len = 0;
-
-	if (ctx->u.cluster_info.local_ctx < 0 ||
-	    ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
-		errno = EBADF;
-		return -1;
-	}
-
-	/* trigger receive here */
-	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-
-	n = ctx->u.cluster_info.queue;
-	if (n == NULL) {
-		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-		errno = EAGAIN;
-		return -1;
-	}
-
-	list_remove(&ctx->u.cluster_info.queue, n);
-
-	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
-	m = n->message;
-	switch(m->msg_control) {
-	case M_CLOSE:
-		ctx->u.cluster_info.remote_ctx = 0;
-		break;
-	case M_OPEN_ACK:
-		/* Response to new connection */
-		ctx->u.cluster_info.remote_ctx = m->src_ctx;
-		break;
-	case M_DATA:
-		/* Kill the message control structure */
-		memmove(m, &m[1], n->len - sizeof(*m));
-		if (msg)
-			*msg = (void *)m;
-		else {
-			printf("Warning: dropping data message\n");
-			free(m);
-		}
-		if (len)
-			*len = (n->len - sizeof(*m));
-		ret = (n->len - sizeof(*m));
-		free(n);
-
-		printf("Message received\n");
-		return ret;
-	case M_OPEN:
-		/* Someone is trying to open a connection */
-	default:
-		/* ?!! */
-		ret = -1;
-		break;
-	}
-
-	free(m);
-	free(n);
-
-	return ret;
-}
-
-
-/**
-  Receive a message from a cluster-context.  This copies out the contents
-  into the user-specified buffer, and does random other things.
- */
-static int
-cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
-	int req;
-	void *priv_msg;
-	size_t priv_len;
-
-	if (!msg || !maxlen) {
-		errno = EINVAL;
-		return -1;
-	}
-
-	req = cluster_msg_wait(ctx, timeout);
-
-	switch (req) {
-	case M_DATA:
-		/* Copy out. */
-		req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
-		if (req < 0) {
-			printf("Ruh roh!\n");
-			return -1;
-		}
-
-		priv_len = (priv_len < maxlen ? priv_len : maxlen);
-
-		memcpy(msg, priv_msg, priv_len);
-		free(priv_msg);
-		return req;
-	case M_CLOSE:
-		errno = ECONNRESET;
-		return -1;
-	case 0:
-		/*printf("Nothing on queue\n");*/
-		return 0;
-	default:
-		printf("PROTOCOL ERROR: Received %d\n", req);
+	if (!ctx)
 		return -1;
-	}
-
-	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+	
+	if (ctx->ops && ctx->ops->mo_wait)
+		return ctx->ops->mo_wait(ctx, timeout);
+	errno = ENOSYS;
 	return -1;
 }
 
 
-static int
-_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
-	struct timeval tv = {0, 0};
-	struct timeval *p = NULL;
-	local_msg_hdr_t h;
-
-	if (timeout >= 0) {
-		tv.tv_sec = timeout;
-		p = &tv;
-	}
-
-	if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
-		return -1;
-
-	if (maxlen < h.msg_len) {
-		printf("WARNING: Buffer too small for message!\n");
-		h.msg_len = maxlen;
-	}
-
-	return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
-}
-
-
-/**
-  Receive a message from a cluster-context.  This copies out the contents
-  into the user-specified buffer, and does random other things.
- */
-static int
-local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
-	int req;
-	char priv_msg[4096];
-	size_t priv_len;
-
-	if (!msg || !maxlen) {
-		errno = EINVAL;
-		return -1;
-	}
-
-	switch (req) {
-	case M_DATA:
-		/* Copy out. */
-		req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
-		if (req <= 0)
-			return -1;
-
-		priv_len = (priv_len < maxlen ? priv_len : maxlen);
-
-		memcpy(msg, priv_msg, priv_len);
-		free(msg);
-		return req;
-	case M_CLOSE:
-		errno = ECONNRESET;
-		return -1;
-	case 0:
-		/*printf("Nothing on queue\n");*/
-		return 0;
-	default:
-		printf("PROTOCOL ERROR: Received %d\n", req);
-		return -1;
-	}
-
-	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
-	return -1;
-}
-
 
 int
 msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
 {
-	if (!ctx || !msg || !maxlen) {
-		errno = EINVAL;
+	errno = EINVAL;
+	if (!ctx)
 		return -1;
-	}
 
-	switch(ctx->type) {
-	case MSG_CLUSTER:
-		return cluster_msg_receive(ctx, msg, maxlen, timeout);
-	case MSG_SOCKET:
-		return local_msg_receive(ctx, msg, maxlen, timeout);
-	default:
-		break;
-	}
-
-	errno = EINVAL;
+	if (ctx->ops && ctx->ops->mo_receive)
+		return ctx->ops->mo_receive(ctx, msg, maxlen, timeout);
+	errno = ENOSYS;
 	return -1;
 }
 
@@ -755,78 +166,28 @@
   /var/run/cluster...
  */
 int
-msg_open(int nodeid, int port, msgctx_t *ctx, int timeout)
+msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
 {
-	int t = 0;
-
 	errno = EINVAL;
 	if (!ctx)
 		return -1;
 
-
-	/*printf("Opening pseudo channel to node %d\n", nodeid);*/
-
-	memset(ctx, 0, sizeof(*ctx));
-	if (nodeid == CMAN_NODEID_US) {
-		if ((ctx->u.local_info.sockfd = local_connect()) < 0) {
-			return -1;
-		}
-		ctx->type = MSG_SOCKET;
-		return 0;
-	}
-
-	ctx->type = MSG_CLUSTER;
-	ctx->u.cluster_info.nodeid = nodeid;
-	ctx->u.cluster_info.port = port;
-	ctx->u.cluster_info.local_ctx = -1;
-	ctx->u.cluster_info.remote_ctx = 0;
-	ctx->u.cluster_info.queue = NULL;
-	ctx->u.cluster_info.select_pipe[0] = -1;
-	ctx->u.cluster_info.select_pipe[1] = -1;
-	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
-	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
-
-	/* Assign context index */
-	if (assign_ctx(ctx) < 0)
-		return -1;
-
-	//printf("  Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
-
-	/* Send open */
-	
-	//printf("  Sending control message M_OPEN\n");
-	if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+	/* XXX SPECIAL CASE... ow. */
+	switch(type) {
+	case MSG_SOCKET:
+		sock_msg_init(ctx);
+		break;
+	case MSG_CLUSTER:
+		cluster_msg_init(ctx);
+		break;
+	default:
 		return -1;
 	}
 
-	/* Ok, wait for a response */
-	while (!is_established(ctx)) {
-		++t;
-		if (t > timeout) {
-			msg_close(ctx);
-			errno = ETIMEDOUT;
-			return -1;
-		}
-			
-		switch(msg_wait(ctx, 1)) {
-		case M_OPEN_ACK:
-			_cluster_msg_receive(ctx, NULL, NULL);
-			break;
-		case M_NONE:
-			continue;
-		default: 
-			printf("PROTO ERROR: M_OPEN_ACK not received \n");
-		}
-	}
-
-	/*	
-	printf("  Remote CTX: %d\n",
-	       ctx->u.cluster_info.remote_ctx);
-	printf("  Pseudo channel established!\n");
-	*/
-	
-
-	return 0;
+	if (ctx->ops && ctx->ops->mo_open)
+		return ctx->ops->mo_open(ctx->type, nodeid, port, ctx, timeout);
+	errno = ENOSYS;
+	return -1;
 }
 
 
@@ -842,484 +203,65 @@
 int
 msg_close(msgctx_t *ctx)
 {
-	msg_q_t *n = NULL;
-
-	if (!ctx) {
-		errno = EINVAL;
-		return -1;
-	}
-
-	switch (ctx->type) {
-	case MSG_CLUSTER:
-		if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
-			errno = EINVAL;
-			return -1;
-		}
-		pthread_mutex_lock(&context_lock);
-		/* Other threads should not be able to see this again */
-		if (contexts[ctx->u.cluster_info.local_ctx])
-			contexts[ctx->u.cluster_info.local_ctx] = NULL;
-		pthread_mutex_unlock(&context_lock);
-		/* Clear receive queue */
-		while ((n = ctx->u.cluster_info.queue) != NULL) {
-			list_remove(&ctx->u.cluster_info.queue, n);
-			free(n->message);
-			free(n);
-		}
-		/* Send close message */
-		if (ctx->u.cluster_info.remote_ctx != 0) {
-			cluster_send_control_msg(ctx, M_CLOSE);
-		}
-
-		/* Close pipe if it's open */
-		if (ctx->u.cluster_info.select_pipe[0] >= 0) {
-			close(ctx->u.cluster_info.select_pipe[0]);
-			ctx->u.cluster_info.select_pipe[0] = -1;
-		}
-		if (ctx->u.cluster_info.select_pipe[1] >= 0) {
-			close(ctx->u.cluster_info.select_pipe[1]);
-			ctx->u.cluster_info.select_pipe[1] = -1;
-		}
-		return 0;
-	case MSG_SOCKET:
-		close(ctx->u.local_info.sockfd);
-		ctx->u.local_info.sockfd = -1;
-		return 0;
-	default:
-		break;
-	}
-
 	errno = EINVAL;
-	return -1;
-}
-
-
-/**
-  Called by cman_dispatch to deal with messages coming across the
-  cluster socket.  This function deals with fanning out the requests
-  and putting them on the per-context queues.  We don't have
-  the benefits of pre-configured buffers, so we need this.
- */
-static void
-process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
-	    uint8_t port, int nodeid)
-{
-	cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
-	msg_q_t *node;
-	msgctx_t *ctx;
-
-	if (len < sizeof(*m)) {
-		printf("Message too short.\n");
-		return;
-	}
-
-	swab_cluster_msg_hdr_t(m);
-
-#if 0
-	printf("Processing ");
-	switch(m->msg_control) {
-	case M_NONE: 
-		printf("M_NONE\n");
-		break;
-	case M_OPEN:
-		printf("M_OPEN\n");
-		break;
-	case M_OPEN_ACK: 
-		printf("M_OPEN_ACK\n");
-		break;
-	case M_DATA: 
-		printf("M_DATA\n");
-		break;
-	case M_CLOSE: 
-		printf("M_CLOSE\n");
-		break;
-	}
-
-	printf("  Node ID: %d %d\n", m->src_nodeid, nodeid);
-	printf("  Remote CTX: %d  Local CTX: %d\n", m->src_ctx, m->dest_ctx);
-#endif
-
-	if (m->dest_ctx >= MAX_CONTEXTS) {
-		printf("Context invalid; ignoring\n");
-		return;
-	}
-
-	while ((node = malloc(sizeof(*node))) == NULL) {
-		sleep(1);
-	}
-	memset(node, 0, sizeof(*node));
-	while ((node->message = malloc(len)) == NULL) {
-		sleep(1);
-	}
-	memcpy(node->message, buf, len);
-	node->len = len;
-
-	pthread_mutex_lock(&context_lock);
-	ctx = contexts[m->dest_ctx];
-	if (!ctx) {
-		/* We received a close for something we've already
-		   detached from our list.  No big deal, just
-		   ignore. */
-		free(node->message);
-		free(node);
-		pthread_mutex_unlock(&context_lock);
-		return;
-	}
-
-	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-	list_insert(&ctx->u.cluster_info.queue, node);
-	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-	/* If a select pipe was set up, wake it up */
-	if (ctx->u.cluster_info.select_pipe[1] >= 0)
-		write(ctx->u.cluster_info.select_pipe[1], "", 1);
-	pthread_mutex_unlock(&context_lock);
-
-	pthread_cond_signal(&ctx->u.cluster_info.cond);
-}
-
-
-/**
-  Accept a new pseudo-private connection coming in over the
-  cluster socket.
- */
-static int
-cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
-{
-	errno = EINVAL;
-	cluster_msg_hdr_t *m;
-	msg_q_t *n;
-	char done = 0;
-	char foo;
-
-	if (!listenctx || !acceptctx)
-		return -1;
-	if (listenctx->u.cluster_info.local_ctx != 0)
-		return -1;
-
-	pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
-
-	n = listenctx->u.cluster_info.queue;
-	if (n == NULL) {
-		pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
-		errno = EAGAIN;
+	if (!ctx)
 		return -1;
-	}
-
-	/* the OPEN should be the first thing on the list; this loop
-	   is probably not necessary */
-	list_do(&listenctx->u.cluster_info.queue, n) {
-
-		m = n->message;
-		switch(m->msg_control) {
-		case M_OPEN:
-			list_remove(&listenctx->u.cluster_info.queue, n);
-			/*printf("Accepting connection from %d %d\n",
-			  	 m->src_nodeid, m->src_ctx);*/
-
-			/* New connection */
-			pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
-					   NULL);
-			pthread_cond_init(&acceptctx->u.cluster_info.cond,
-					  NULL);
-			acceptctx->u.cluster_info.queue = NULL;
-			acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
-			acceptctx->u.cluster_info.nodeid = m->src_nodeid;
-			acceptctx->u.cluster_info.port = m->msg_port;
-
-			assign_ctx(acceptctx);
-			cluster_send_control_msg(acceptctx, M_OPEN_ACK);
-
-			if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
-				read(listenctx->u.cluster_info.select_pipe[0],
-				     &foo, 1);
-			}
-
-			done = 1;
-			free(m);
-			free(n);
-
-			break;
-		case M_DATA:
-			/* Data messages (i.e. from broadcast msgs) are
-			   okay too!...  but we don't handle them here */
-			break;
-		default:
-			/* Other message?! */
-			printf("Odd... %d\n", m->msg_control);
-			break;
-		}
-
-		if (done)
-			break;
-
-	} while (!list_done(&listenctx->u.cluster_info.queue, n));
-
-	pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
-
-	return 0;
+	if (ctx->ops && ctx->ops->mo_close)
+		return ctx->ops->mo_close(ctx);
+	errno = ENOSYS;
+	return -1;
 }
 
 
-/* XXX INCOMPLETE */
 int
 msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
 {
-	switch(listenctx->type) {
-	case MSG_CLUSTER:
-		return cluster_msg_accept(listenctx, acceptctx);
-	case MSG_SOCKET:
-		return 0;
-	default:
-		break;
-	}
-
-	return -1;
-}
-
-
-static int
-local_listener_sk(void)
-{
-	int sock;
-	struct sockaddr_un su;
-	mode_t om;
-
-	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
-	if (sock < 0)
+	errno = EINVAL;
+	if (!listenctx || !acceptctx)
 		return -1;
-
-	unlink(RGMGR_SOCK);
-	om = umask(077);
-	su.sun_family = PF_LOCAL;
-	snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK);
-
-	if (bind(sock, &su, sizeof(su)) < 0) {
-		umask(om);
-		goto fail;
-	}
-	umask(om);
-
-	if (listen(sock, SOMAXCONN) < 0)
-		goto fail;
-
-	return sock;
-fail:
-	if (sock >= 0)
-		close(sock);
+	if (listenctx->ops && listenctx->ops->mo_accept)
+		return listenctx->ops->mo_accept(listenctx, acceptctx);
+	errno = ENOSYS;
 	return -1;
 }
 
 
-/**
-  This waits for events on the cluster comms FD and
-  dispatches them using cman_dispatch.  Initially,
-  the design had no permanent threads, but that model
-  proved difficult to implement correctly.
- */
-static void *
-cluster_comms_thread(void *arg)
-{
-	while (thread_running) {
-		poll_cluster_messages(2);
-	}
-
-	return NULL;
-}
-
-
-/*
-   Transliterates a CMAN event to a control message
- */
-static void
-process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
-{
-	cluster_msg_hdr_t *msg;
-	int *argp;
-	msg_q_t *node;
-	msgctx_t *ctx;
-
-	/* Allocate queue node */
-	while ((node = malloc(sizeof(*node))) == NULL) {
-		sleep(1);
-	}
-	memset(node, 0, sizeof(*node));
-
-	/* Allocate message: header + int (for arg) */
-	while ((msg = malloc(sizeof(int) +
-			     sizeof(cluster_msg_hdr_t))) == NULL) {
-		sleep(1);
-	}
-	memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t));
-
-
-	switch(reason) {
-#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
-	case CMAN_REASON_PORTOPENED:
-		msg->msg_control = M_PORTOPENED;
-		break;
-	case CMAN_REASON_TRY_SHUTDOWN:
-		msg->msg_control = M_TRY_SHUTDOWN;
-		break;
-#endif
-	case CMAN_REASON_PORTCLOSED:
-		msg->msg_control = M_PORTCLOSED;
-		break;
-	case CMAN_REASON_STATECHANGE:
-		msg->msg_control = M_STATECHANGE;
-		break;
-	}
-
-	argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
-	*argp = arg;
-
-	node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
-	node->message = msg;
-
-	pthread_mutex_lock(&context_lock);
-	ctx = contexts[0]; /* This is the cluster context... */
-	if (!ctx) {
-		/* We received a close for something we've already
-		   detached from our list.  No big deal, just
-		   ignore. */
-		free(node->message);
-		free(node);
-		pthread_mutex_unlock(&context_lock);
-		return;
-	}
-
-	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-	list_insert(&ctx->u.cluster_info.queue, node);
-	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-	/* If a select pipe was set up, wake it up */
-	if (ctx->u.cluster_info.select_pipe[1] >= 0)
-		write(ctx->u.cluster_info.select_pipe[1], "", 1);
-	pthread_mutex_unlock(&context_lock);
-
-	pthread_cond_signal(&ctx->u.cluster_info.cond);
-}
-
-
-/* XXX INCOMPLETE */
+/* XXX Special case */
 int
-msg_init(chandle_t *ch)
+msg_listen(int type, void *port, int me, msgctx_t **ctx)
 {
-	int e;
-	pthread_attr_t attrs;
-	msgctx_t *ctx;
-
-	pthread_mutex_lock(&ch->c_lock);
-
-	/* Set up local context */
-
-	ctx = msg_new_ctx();
-	if (!ctx) {
-		pthread_mutex_unlock(&ch->c_lock);
+	errno = EINVAL;
+	if (!me)
 		return -1;
-	}
-
-	ctx->type = MSG_SOCKET;
-	ctx->u.local_info.sockfd = local_listener_sk();
-	ctx->u.local_info.flags = SKF_LISTEN;
-
-	ch->local_ctx = ctx;
-
-	ctx = msg_new_ctx();
-
-	if (!ctx) {
-		pthread_mutex_unlock(&ch->c_lock);
-		msg_free_ctx((msgctx_t *)ch->local_ctx);
+	if (type == MSG_NONE)
 		return -1;
-	}
-
-	gch = ch;
-
-	if (cman_start_recv_data(ch->c_cluster, process_cman_msg,
-				 RG_PORT) != 0) {
-		e = errno;
-		msg_close(ch->local_ctx);
-		pthread_mutex_unlock(&ch->c_lock);
-		msg_free_ctx((msgctx_t *)ch->local_ctx);
-		msg_free_ctx((msgctx_t *)ch->cluster_ctx);
-		errno = e;
+	if (!ctx)
 		return -1;
-	}
 
-	if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) {
-		e = errno;
-		msg_close(ch->local_ctx);
-		pthread_mutex_unlock(&ch->c_lock);
-		msg_free_ctx((msgctx_t *)ch->local_ctx);
-		msg_free_ctx((msgctx_t *)ch->cluster_ctx);
-		errno = e;
+	if (type == MSG_CLUSTER) {
+		return cluster_msg_listen(me, port, ctx);
+	} else if (type == MSG_SOCKET) {
+		return sock_msg_listen(me, port, ctx);
 	}
 
-	ch->cluster_ctx = ctx;
-	pthread_mutex_unlock(&ch->c_lock);
-
-	pthread_mutex_lock(&context_lock);
-
-	memset(contexts, 0, sizeof(contexts));
-	contexts[0] = ctx;
-
-	ctx->type = MSG_CLUSTER;
-	ctx->u.cluster_info.port = RG_PORT; /* port! */
-	ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
-	ctx->u.cluster_info.select_pipe[0] = -1;
-	ctx->u.cluster_info.select_pipe[1] = -1;
-	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
-	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
-	pthread_mutex_unlock(&context_lock);
-
-       	pthread_attr_init(&attrs);
-       	pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
-       	pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
-
-	thread_running = 1;	
-	pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
-
-
-	pthread_attr_destroy(&attrs);
-
-	return 0;
+	return -1;
 }
 
 
-int
-msg_print_ctx(int ctx)
+void
+msg_print(msgctx_t *ctx)
 {
-	if (!contexts[ctx])
-		return -1;
-
-	printf("Cluster Message Context %d\n", ctx);
-	printf("  Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid);
-	printf("  Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx);
-	return 0;
+	if (ctx->ops && ctx->ops->mo_print)
+		return ctx->ops->mo_print(ctx);
 }
 
 
 /* XXX INCOMPLETE */
 int
-msg_shutdown(chandle_t *ch)
+msg_shutdown(void)
 {
-	if (!ch) {
-		errno = EINVAL;
-		return -1;
-	}
-
-	while (pthread_kill(comms_thread, 0) == 0)
-		sleep(1);
-
-	pthread_mutex_lock(&ch->c_lock);
-
-	/* xxx purge everything */
-	msg_close(ch->local_ctx);
-	cman_end_recv_data(ch->c_cluster);
-
-	msg_free_ctx(ch->local_ctx);
-	msg_free_ctx(ch->cluster_ctx);
-
-
-	pthread_mutex_unlock(&ch->c_lock);
+	sock_msg_shutdown();
+	cluster_msg_shutdown();
 
 	return 0;
 }
@@ -1337,7 +279,6 @@
 {
 	msgctx_t *p;
 	
-	printf("Alloc %d\n", sizeof(msgctx_t));
 	p = malloc(sizeof(msgctx_t));
 	if (!p)
 		return NULL;
@@ -1354,4 +295,3 @@
 {
 	free(dead);
 }
-
--- cluster/rgmanager/src/clulib/msgsimple.c	2006/06/02 17:37:10	1.5
+++ cluster/rgmanager/src/clulib/msgsimple.c	2006/07/11 23:52:41	1.6
@@ -76,9 +76,8 @@
 msg_receive_simple(msgctx_t *ctx, generic_msg_hdr ** buf, int timeout)
 {
 	int ret;
-	char msgbuf[16384];
+	char msgbuf[4096];
 	generic_msg_hdr *peek_msg = (generic_msg_hdr *)msgbuf;
-	int size;
 
 	/*
 	 * Peek at the header.  We need the size of the inbound buffer!
@@ -102,6 +101,7 @@
 		return -1;
 	}
 
+	/* Decode so we know how much to allocate */
 	swab_generic_msg_hdr(peek_msg);
 	if (peek_msg->gh_magic != GENERIC_HDR_MAGIC) {
 		fprintf(stderr, "Invalid magic: Wanted 0x%08x, got 0x%08x\n",
@@ -113,14 +113,15 @@
 	 * allocate enough memory to receive the header + diff buffer
 	 */
 	*buf = malloc(peek_msg->gh_length);
-
 	if (!*buf) {
 		fprintf(stderr, "%s: malloc: %s", __FUNCTION__,
 		       strerror(errno));
 		return -1;
 	}
-	memset(*buf, 0, peek_msg->gh_length);
-	memcpy(*buf, msgbuf + sizeof(generic_msg_hdr), peek_msg->gh_length);
+	memcpy(*buf, msgbuf, peek_msg->gh_length);
+
+	/* Put it back into the original order... */
+	swab_generic_msg_hdr((generic_msg_hdr *)(*buf));
 
 	return ret;
 }
--- cluster/rgmanager/src/clulib/rg_strings.c	2005/03/07 17:25:12	1.3
+++ cluster/rgmanager/src/clulib/rg_strings.c	2006/07/11 23:52:41	1.4
@@ -1,3 +1,21 @@
+/*
+  Copyright Red Hat, Inc. 2004-2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
 const char *rg_state_strings[] = {
 	"stopped",
 	"starting",
--- cluster/rgmanager/src/clulib/signals.c	2004/08/13 15:36:51	1.1
+++ cluster/rgmanager/src/clulib/signals.c	2006/07/11 23:52:41	1.2
@@ -1,3 +1,21 @@
+/*
+  Copyright Red Hat, Inc. 2003-2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
 #include <signal.h>
 #include <stdlib.h>
 #include <string.h>
--- cluster/rgmanager/src/clulib/vft.c	2006/06/02 17:37:10	1.13
+++ cluster/rgmanager/src/clulib/vft.c	2006/07/11 23:52:41	1.14
@@ -1,5 +1,5 @@
 /*
-  Copyright Red Hat, Inc. 2002-2003
+  Copyright Red Hat, Inc. 2002-2006
 
   This program is free software; you can redistribute it and/or modify it
   under the terms of the GNU General Public License as published by the
@@ -40,12 +40,8 @@
 #include <stdio.h>
 #include <assert.h>
 #include <signals.h>
+#include <lock.h>
 
-
-int clu_lock_verbose(char *lockname, int flags, void **lockpp);
-
-static int vf_lfds[2];
-static int vf_lfd = 0;
 static key_node_t *key_list = NULL;	/** List of key nodes. */
 static int _node_id = (int)-1;/** Our node ID, set with vf_init. */
 static uint16_t _port = 0;		/** Our daemon ID, set with vf_init. */
@@ -57,7 +53,7 @@
 static pthread_mutex_t key_list_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_mutex_t vf_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_t vf_thread = (pthread_t)-1;
-static int thread_ready = 0;
+static int vf_thread_ready = 0;
 static vf_vote_cb_t default_vote_cb = NULL;
 static vf_vote_cb_t default_commit_cb = NULL;
 
@@ -65,43 +61,42 @@
 /*
  * Internal Functions
  */
-static int send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2,
-		       int log_errors);
-static int vf_send_abort(msgctx_t *ctx);
-static int vf_send_commit(msgctx_t *ctx);
-static void close_all(msgctx_t *fds);
+static int _send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2,
+		        int log_errors);
+static int vf_send_abort(msgctx_t *ctx, uint32_t trans);
+static int vf_send_commit(msgctx_t *ctx, uint32_t trans);
 static key_node_t * kn_find_key(char *keyid);
-static key_node_t * kn_find_fd(msgctx_t *ctx);
-static int vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp);
+static key_node_t * kn_find_trans(uint32_t trans);
+static int vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp);
 static int vf_resolve_views(key_node_t *key_node);
-static int vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout);
-static view_node_t * vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno,
+static int vf_unanimous(msgctx_t *ctx, int trans, int remain, int timeout);
+static view_node_t * vn_new(uint32_t trans, uint32_t nodeid, int viewno,
 			    void *data, uint32_t datalen);
 static int vf_request_current(cluster_member_list_t *membership, char *keyid,
-		   	      int *viewno, void **data, uint32_t *datalen);
-static int _vf_purge(key_node_t *key_node, msgctx_t **fd);
+		   	      uint64_t *viewno, void **data, uint32_t *datalen);
+static int _vf_purge(key_node_t *key_node, uint32_t *trans);
 
 /* Join-view buffer list functions */
 static int vn_cmp(view_node_t *left, view_node_t *right);
 static int vn_insert_sorted(view_node_t **head, view_node_t *node);
-static view_node_t * vn_remove(view_node_t **head, msgctx_t *ctx);
-static int vf_buffer_join_msg(int fd, vf_msg_t *hdr,
+static view_node_t * vn_remove(view_node_t **head, uint32_t trans);
+static int vf_buffer_join_msg(vf_msg_t *hdr,
 			      struct timeval *timeout);
 
 /* Commits buffer list functions */
 static int vc_cmp(commit_node_t *left, commit_node_t *right);
 static int vc_insert_sorted(commit_node_t **head, commit_node_t *node);
-static commit_node_t * vc_remove(commit_node_t **head, int fd);
-static int vf_buffer_commit(int fd);
+static commit_node_t * vc_remove(commit_node_t **head, uint32_t trans);
+static int vf_buffer_commit(uint32_t trans);
 
 /* Simple functions which client calls to vote/abort */
-static int vf_vote_yes(msgctx_t *ctx);
-static int vf_vote_no(msgctx_t *ctx);
-static int vf_abort(msgctx_t *ctx);
+static int vf_vote_yes(msgctx_t *ctx, uint32_t trans);
+static int vf_vote_no(msgctx_t *ctx, uint32_t trans);
+static int vf_abort(uint32_t trans);
 static int tv_cmp(struct timeval *left, struct timeval *right);
 
 /* Resolution */
-static int vf_try_commit(key_node_t *key_node);
+static uint32_t vf_try_commit(key_node_t *key_node);
 
 int vf_init(int my_node_id, uint16_t my_port,
 	    vf_vote_cb_t vote_cb, vf_commit_cb_t commit_cb);
@@ -111,7 +106,7 @@
 			  vf_commit_cb_t commit_cb);
 int vf_write(cluster_member_list_t *memberhip, uint32_t flags,
 	     char *keyid, void *data, uint32_t datalen);
-int vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes);
+int vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes);
 int vf_end(char *keyid);
 int vf_read(cluster_member_list_t *membership, char *keyid, uint64_t *view,
 	    void **data, uint32_t *datalen);
@@ -123,14 +118,14 @@
 struct vf_args {
 	uint16_t port;
 	int local_node_id;
+	msgctx_t *ctx;
 };
 
 
 static int
-send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2, int log_errors)
+_send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2, int log_errors)
 {
 	generic_msg_hdr hdr;
-	int x, rv = 0;
 
 	hdr.gh_magic = GENERIC_HDR_MAGIC;
 	hdr.gh_length = sizeof(hdr);
@@ -140,51 +135,27 @@
 
 	swab_generic_msg_hdr(&hdr);
 
-	for (x=0; peer_ctx[x].type != -1; x++) {
-		if (msg_send(&peer_ctx[x], &hdr, sizeof(hdr)) == sizeof(hdr))
-			continue;
-
-		if (log_errors) {
-#if 0
-			clulog(LOG_ERR, "#14: Failed to send %d "
-			       "bytes to %d!\n", sizeof(hdr),
-			       x);
-#endif
-		}
-		rv = -1;
-	}
-
-	return rv;
+	return msg_send(ctx, &hdr, sizeof(hdr));
 }
 
 
 static int 
-vf_send_abort(msgctx_t *ctx)
+vf_send_abort(msgctx_t *ctx, uint32_t trans)
 {
 #ifdef DEBUG
-	printf("VF: Broadcasting ABORT\n");
+	printf("VF: Broadcasting ABORT (X#%08x)\n", trans);
 #endif
-	return send_to_all(ctx, VF_MESSAGE, VF_ABORT, 0, 0);
+	return _send_simple(ctx, VF_MESSAGE, VF_ABORT, trans, 0);
 }
 
 
 static int
-vf_send_commit(msgctx_t *ctx)
+vf_send_commit(msgctx_t *ctx, uint32_t trans)
 {
 #ifdef DEBUG
 	printf("VF: Broadcasting FORMED\n");
 #endif
-	return send_to_all(ctx, VF_MESSAGE, VF_VIEW_FORMED, 0, 1);
-}
-
-
-static void
-close_all(msgctx_t *ctx)
-{
-	int x;
-	for (x = 0; ctx[x].type != -1; x++) {
-		msg_close(&ctx[x]);
-	}
+	return _send_simple(ctx, VF_MESSAGE, VF_VIEW_FORMED, trans, 1);
 }
 
 
@@ -202,14 +173,14 @@
 
 
 static key_node_t *
-kn_find_fd(msgctx_t *ctx)
+kn_find_trans(uint32_t trans)
 {
 	key_node_t *cur;
 	view_node_t *curvn;
 
 	for (cur = key_list; cur; cur = cur->kn_next)
 		for (curvn = cur->kn_jvlist; curvn; curvn = curvn->vn_next)
-			if (curvn->vn_ctx == ctx)
+			if (curvn->vn_transaction == trans)
 				return cur;
 
 	return NULL;
@@ -217,15 +188,17 @@
 
 
 static int
-vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp)
+vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp)
 {
 	struct timeval timeout;
 	key_node_t *key_node;
+	uint32_t trans;
 
+	trans = hdrp->vm_msg.vf_transaction;
 #ifdef DEBUG
-	printf("VF_JOIN_VIEW from member #%d! Key: %s #%d\n",
+	printf("VF_JOIN_VIEW from member #%d! Key: %s #%d (X#%08x)\n",
 	       hdrp->vm_msg.vf_coordinator, hdrp->vm_msg.vf_keyid,
-	       (int) hdrp->vm_msg.vf_view);
+	       (int) hdrp->vm_msg.vf_view, trans);
 #endif
 
 	pthread_mutex_lock(&key_list_mutex);
@@ -241,7 +214,7 @@
 			pthread_mutex_unlock(&key_list_mutex);
 			printf("VF: Error: Failed to initialize %s\n",
 			       hdrp->vm_msg.vf_keyid);
-			vf_vote_no(ctx);
+			vf_vote_no(ctx, trans);  
 			return VFR_ERROR;
 		}
 
@@ -258,7 +231,7 @@
 #ifdef DEBUG
 			printf("VF: Voting NO (via callback)\n");
 #endif
-			vf_vote_no(ctx);
+			vf_vote_no(ctx, trans);
 			return VFR_OK;
 		}
 	}
@@ -269,12 +242,12 @@
 	timeout.tv_sec = key_node->kn_tsec;
 	timeout.tv_usec = 0;
 
-	if (vf_buffer_join_msg(ctx, (vf_msg_t *) hdrp, &timeout)) {
+	if (vf_buffer_join_msg((vf_msg_t *) hdrp, &timeout)) {
 		pthread_mutex_unlock(&key_list_mutex);
 #ifdef DEBUG
-		printf("VF: Voting YES\n");
+		printf("VF: Voting YES (X#%08x)\n", trans);
 #endif
-		vf_vote_yes(ctx);
+		vf_vote_yes(ctx, trans);
 		return VFR_OK;
 	}
 
@@ -282,7 +255,7 @@
 #ifdef DEBUG
 	printf("VF: Voting NO\n");
 #endif
-	vf_vote_no(ctx);
+	vf_vote_no(ctx, trans);
 	return VFR_NO;
 }
 
@@ -297,13 +270,10 @@
 	int commits = 0;
 	void *data;
 	uint32_t datalen;
-	msgctx_t *ctx;
+	uint32_t trans;
 
-	while ((ctx = vf_try_commit(key_node)) != -1) {
-
-		/* XXX in general, this shouldn't kill the fd... */
+	while ((trans = vf_try_commit(key_node)) != 0) {
 		commits++;
-		msg_close(commitfd);
 	}
 
 	if (key_node->kn_commit_cb) {
@@ -327,12 +297,12 @@
 
 
 static int
-vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout)
+vf_unanimous(msgctx_t *mcast_ctx, int trans, int remain,
+	     int timeout)
 {
 	generic_msg_hdr response;
 	struct timeval tv;
-	fd_set rfds;
-	int nready, x, max;
+	int x;
 
 	/* Set up for the select */
 	tv.tv_sec = timeout;
@@ -346,69 +316,62 @@
 	 * Flag hosts which we received messages from so we don't
 	 * read a second message.
 	 */
-	while (remain) {
-		FD_ZERO(&rfds);
-		for (x = 0; peer_ctx[x].type != M_NONE; x++)
-			msg_fd_set(&peer_ctx[x], &rfds, &max);
-
-		nready = select(max + 1, &rfds, NULL, NULL, &tv);
-		if (nready <= -1) {
-			if (nready == 0)
-				printf("VF Abort: Timed out!\n");
-			else
-				printf("VF Abort: %s\n",
-				       strerror(errno));
-			return 0;
+	while (remain && timeout) {
+
+		if (msg_wait(mcast_ctx, 5) <= 0) {
+			--timeout;
+			continue;
 		}
 
-		for (x = 0; (peer_ctx[x].type != M_NONE) && nready; x++) {
-			if (!msg_fd_isset(&peer_fds[x], &rfds))
-				continue;
+		x = msg_receive(mcast_ctx, &response, sizeof(response), 1);
+		if (x < sizeof(response))
+			continue;
+		
+		/*
+		 * Decode & validate message
+		 */
+		swab_generic_msg_hdr(&response);
+		if ((response.gh_magic != GENERIC_HDR_MAGIC) ||
+		    (response.gh_command != VF_MESSAGE)) {
+			/* Don't process anything but votes */
+			continue;
+		}
 
-			remain--;
-			nready--;
-			/*
-			 * Get reply from node x. XXX 1 second timeout?
-			 */
-			if (msg_receive(&peer_ctx[x], &response,
-						sizeof(response),
-						1) == -1) {
-				printf("VF: Abort: Timed out during "
-				       "receive from fd #%p\n", &peer_ctx[x]);
-				return 0;
-			}
-			
-			/*
-			 * Decode & validate message
-			 */
-			swab_generic_msg_hdr(&response);
-			if ((response.gh_magic != GENERIC_HDR_MAGIC) ||
-			    (response.gh_command != VF_MESSAGE) ||
-			    (response.gh_arg1 != VF_VOTE)) {
-				printf("VF: Abort: Invalid header in"
-				       " reply from fd #%p\n", &peer_fds[x]);
-				return 0;
-			}
-			
+		if (vf_command(response.gh_arg1) != VF_VOTE)
+			/* Don't process anything but votes */
+			continue;
+
+		if (response.gh_arg2 != trans)
+			continue;
+
+		/*
+		 * If we get a 'NO', we are done.
+		 */
+		if (!(vf_flags(response.gh_arg1) & VFMF_AFFIRM)) {
 			/*
-			 * If we get a 'NO', we are done.
+			 * XXX ok, it might be a mangled message;
+			 * treat it as no anyway!
 			 */
-			if (response.gh_arg2 != 1) {
-				/*
-				 * XXX ok, it might be a mangled message;
-				 * treat it as no anyway!
-				 */
-				printf("VF: Abort: fd #%d voted NO\n",
-				       peer_fds[x]);
-				return 0;
-			}
-
 #ifdef DEBUG
-			printf("VF: fd #%d voted YES\n", peer_fds[x]);
+			printf("VF: Abort: someone voted NO\n");
 #endif
+			return 0;
 		}
+
+#ifdef DEBUG
+		printf("VF: YES\n");
+#endif
+		--remain;
 	}
 
+	if (remain) {
+#ifdef DEBUG
+		printf("VF: Timed out waiting for %d responses\n", remain);
+#endif
+		return 0;
+	}
+		
+
 	/*
 	 * Whohoooooooo!
 	 */
@@ -420,7 +383,7 @@
  * ...
  */
 static view_node_t *
-vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno, void *data,
+vn_new(uint32_t trans, uint32_t nodeid, int viewno, void *data,
        uint32_t datalen)
 {
 	view_node_t *new;
@@ -433,7 +396,7 @@
 
 	memset(new,0,totallen);
 
-	new->vn_ctx = ctx;
+	new->vn_transaction = trans;
 	new->vn_nodeid = nodeid;
 	new->vn_viewno = viewno;
 	new->vn_datalen = datalen;
@@ -502,7 +465,7 @@
 
 
 static view_node_t *
-vn_remove(view_node_t **head, msgctx_t *ctx)
+vn_remove(view_node_t **head, uint32_t trans)
 {
 	view_node_t *cur = *head, *back = NULL;
 
@@ -510,7 +473,7 @@
 		return NULL;
 
 	do {
-		if (cur->vn_ctx == ctx) {
+		if (cur->vn_transaction == trans) {
 			if (back) {
 				back->vn_next = cur->vn_next;
 				cur->vn_next = NULL;
@@ -537,7 +500,7 @@
  * (b) we don't receive any messages.
  */
 static int
-vf_buffer_join_msg(msgctx_t *ctx, vf_msg_t *hdr, struct timeval *timeout)
+vf_buffer_join_msg(vf_msg_t *hdr, struct timeval *timeout)
 {
 	key_node_t *key_node;
 	view_node_t *newp;
@@ -557,7 +520,8 @@
 		return 0;
 	}
 
-	newp = vn_new(ctx, hdr->vm_msg.vf_coordinator, hdr->vm_msg.vf_view, 
+	newp = vn_new(hdr->vm_msg.vf_transaction, hdr->vm_msg.vf_coordinator,
+		      hdr->vm_msg.vf_view, 
 		      hdr->vm_msg.vf_data, hdr->vm_msg.vf_datalen);
 
 	if (timeout && (timeout->tv_sec || timeout->tv_usec)) {
@@ -585,10 +549,10 @@
 static int
 vc_cmp(commit_node_t *left, commit_node_t *right)
 {
-	if (left->vc_fd < right->vc_fd)
+	if (left->vc_transaction < right->vc_transaction)
 		return -1;
 
-	if (left->vc_fd == right->vc_fd)
+	if (left->vc_transaction == right->vc_transaction)
 		return 0;
 
 	return 1;
@@ -637,7 +601,7 @@
 
 
 static commit_node_t *
-vc_remove(commit_node_t **head, int fd)
+vc_remove(commit_node_t **head, uint32_t trans)
 {
 	commit_node_t *cur = *head, *back = NULL;
 
@@ -645,7 +609,7 @@
 		return NULL;
 
 	do {
-		if (cur->vc_fd == fd) {
+		if (cur->vc_transaction == trans) {
 			if (back) {
 				back->vc_next = cur->vc_next;
 				cur->vc_next = NULL;
@@ -671,13 +635,13 @@
  * the last 'join-view' message.
  */
 static int
-vf_buffer_commit(int fd)
+vf_buffer_commit(uint32_t trans)
 {
 	key_node_t *key_node;
 	commit_node_t *newp;
 	int rv;
 
-	key_node = kn_find_fd(fd);
+	key_node = kn_find_trans(trans);
 	if (!key_node)
 		return 0;
 
@@ -686,7 +650,7 @@
 		return 0;
 
 	newp->vc_next = NULL;
-	newp->vc_fd = fd;
+	newp->vc_transaction = trans;
 
 	rv = vc_insert_sorted(&key_node->kn_clist, newp);
 	if (!rv)
@@ -697,31 +661,32 @@
 
 
 static int
-vf_vote_yes(msgctx_t *ctx)
+vf_vote_yes(msgctx_t *ctx, uint32_t trans)
 {
-	return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 1);
-
+	/* XXX */
+	return _send_simple(ctx, VF_MESSAGE, VF_VOTE | VFMF_AFFIRM, trans, 0);
 }
 
 
 static int
-vf_vote_no(msgctx_t *ctx)
+vf_vote_no(msgctx_t *ctx, uint32_t trans)
 {
-	return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 0);
+	/* XXX */
+	return _send_simple(ctx, VF_MESSAGE, VF_VOTE, trans, 0);
 }
 
 
 static int
-vf_abort(msgctx_t *ctx)
+vf_abort(uint32_t trans)
 {
 	key_node_t *key_node;
 	view_node_t *cur;
 
-	key_node = kn_find_fd(ctx);
+	key_node = kn_find_trans(trans);
 	if (!key_node)
 		return -1;
 
-	cur = vn_remove(&key_node->kn_jvlist, ctx);
+	cur = vn_remove(&key_node->kn_jvlist, trans);
 	if (!cur)
 		return -1;
 
@@ -787,30 +752,30 @@
 /**
  * Try to commit views in a given key_node.
  */
-static msgctx_t *
+static uint32_t
 vf_try_commit(key_node_t *key_node)
 {
 	view_node_t *vnp;
 	commit_node_t *cmp;
-	msgctx_t *ctx = NULL;
+	uint32_t trans = 0;
 
 	if (!key_node)
-		return NULL;
+		return 0;
 
 	if (!key_node->kn_jvlist)
-		return NULL;
+		return 0;
 
-	ctx = key_node->kn_jvlist->vn_ctx;
+	trans = key_node->kn_jvlist->vn_transaction;
 		
-	cmp = vc_remove(&key_node->kn_clist, ctx);
+	cmp = vc_remove(&key_node->kn_clist, trans);
 	if (!cmp) {
 		/*printf("VF: Commit for fd%d not received yet!", fd);*/
-		return NULL;
+		return 0;
 	}
 
 	free(cmp); /* no need for it any longer */
 		
-	vnp = vn_remove(&key_node->kn_jvlist, ctx);
+	vnp = vn_remove(&key_node->kn_jvlist, trans);
 	if (!vnp) {
 		/*
 		 * But, we know it was the first element on the list?!!
@@ -835,46 +800,27 @@
 	memcpy(key_node->kn_data, vnp->vn_data, vnp->vn_datalen);
 
 	free(vnp);
-	return ctx;
+	return trans;
 }
 
 
 void
-vf_event_loop(int my_node_id)
+vf_event_loop(msgctx_t *ctx, int my_node_id)
 {
-	int max, nready, n, fd, flags;
-	struct timeval tv;
-	fd_set rfds;
+	int n;
 	generic_msg_hdr *hdrp = NULL;
 
-	FD_ZERO(&rfds);
-	max = msg_fill_fdset(&rfds, MSG_ALL, MSGP_VFS);
-
-	tv.tv_sec = 1;
-	tv.tv_usec = 0;
-	nready = select(max + 1, &rfds, NULL, NULL, &tv);
-	if (nready <= 0)
-		return;
-
-	while (nready) {
-		fd = msg_next_fd(&rfds);
-		--nready;
+	if (msg_wait(ctx, 3) != 0) {
 
-		flags = msg_get_flags(fd);
-
-		if (flags & MSG_LISTEN)
-			fd = msg_accept(fd, 1, NULL);
-
-		n = msg_receive_simple(fd, &hdrp, 5);
+		n = msg_receive_simple(ctx, &hdrp, 2);
 
 		if (n <= 0 || !hdrp) {
-			msg_close(fd);
-			continue;
+			return;
 		}
 
 		swab_generic_msg_hdr(hdrp);
 		if (hdrp->gh_command == VF_MESSAGE) {
-			if (vf_process_msg(fd, hdrp, n) == VFR_COMMIT) {
+			if (vf_process_msg(ctx, 0, hdrp, n) == VFR_COMMIT) {
 #ifdef DEBUG
 				printf("VFT: View committed\n");
 #endif
@@ -893,7 +839,7 @@
 vf_wait_ready(void)
 {
 	pthread_mutex_lock(&vf_mutex);
-	while (!thread_ready) {
+	while (!vf_thread_ready) {
 		pthread_mutex_unlock(&vf_mutex);
 		usleep(50000);
 		pthread_mutex_lock(&vf_mutex);
@@ -908,12 +854,14 @@
 	int my_node_id;
 	uint16_t port;
 	key_node_t *cur;
-	int fd;
+	uint32_t trans;
+	msgctx_t *ctx;
 
 	block_all_signals();
 
 	port = ((struct vf_args *)arg)->port;
 	my_node_id = ((struct vf_args *)arg)->local_node_id;
+	ctx = ((struct vf_args *)arg)->ctx;
 	free(arg);
 
 #ifdef DEBUG
@@ -921,29 +869,20 @@
 #endif
 
 	pthread_mutex_lock(&vf_mutex);
-#if 0
-	if ((vf_lfd = msg_listen(port, MSGP_VFS, vf_lfds, 2)) <= 0) {
-		printf("Unable to set up listen socket on port %d\n",
-		       port);
-		pthread_mutex_unlock(&vf_mutex);
-		pthread_exit(NULL);
-	}
-
-	thread_ready = 1;
+	vf_thread_ready = 1;
 	pthread_mutex_unlock(&vf_mutex);
-#endif
 
-	while (1) {
+	while (vf_thread_ready) {
 		pthread_mutex_lock(&key_list_mutex);
 		for (cur = key_list; cur; cur = cur->kn_next) {
 			/* Destroy timed-out join views */
-			while (_vf_purge(cur, &fd) != VFR_NO) {
-				msg_close(fd);
-			}
+			while (_vf_purge(cur, &trans) != VFR_NO);
 		}
 		pthread_mutex_unlock(&key_list_mutex);
-		vf_event_loop(my_node_id);
+		vf_event_loop(ctx, my_node_id);
 	}
+
+	msg_close(ctx);
 	return NULL;
 }
 
@@ -952,40 +891,43 @@
 /**
  * Initialize VF.  Initializes the View Formation sub system.
  * @param my_node_id	The node ID of the caller.
- * @param my_port	The port of the caller.
  * @return		0 on success, -1 on failure.
  */
 int
 vf_init(int my_node_id, uint16_t my_port, vf_vote_cb_t vcb,
 	vf_commit_cb_t ccb)
 {
-	struct vf_args *va;
-
+	struct vf_args *args;
+	msgctx_t *ctx;
 	if (my_node_id == (int)-1)
 		return -1;
+	
+	while((ctx = msg_new_ctx()) == NULL)
+		sleep(1);
 
-	if (my_port == 0)
-		return -1;
+	while((args = malloc(sizeof(*args))) == NULL)
+		sleep(1);
 
-	pthread_mutex_lock(&vf_mutex);
-	if (vf_thread != (pthread_t)-1) {
-		pthread_mutex_unlock(&vf_mutex);
-		return 0;
+	if (msg_open(MSG_CLUSTER, 0, my_port, ctx, 1) < 0) {
+		free(ctx);	
+		free(args);
+		return -1;
 	}
 
-	va = malloc(sizeof(*va));
-	va->local_node_id = my_node_id;
-	va->port = my_port;
+	args->port = my_port;
+	args->local_node_id = my_node_id;
+	args->ctx = ctx;
 
-	pthread_create(&vf_thread, NULL, vf_server, va);
 
-	/* Write/read needs this */
+	pthread_mutex_lock(&vf_mutex);
 	_port = my_port;
 	_node_id = my_node_id;
 	default_vote_cb = vcb;
 	default_commit_cb = ccb;
 	pthread_mutex_unlock(&vf_mutex);
 
+	pthread_create(&vf_thread, NULL, vf_server, args);
+
 	vf_wait_ready();
 
 	return 0;
@@ -998,20 +940,14 @@
 int
 vf_shutdown(void)
 {
-	int x;
 	key_node_t *c_key;
 	view_node_t *c_jv;
 	commit_node_t *c_cn;
 
 	pthread_mutex_lock(&vf_mutex);
+	vf_thread_ready = 0;
 	pthread_cancel(vf_thread);
 	pthread_join(vf_thread, NULL);
-	thread_ready = 0;
-	vf_thread = (pthread_t)0;
-
-	for (x = 0 ; x < vf_lfd; x++)
-		msg_close(vf_lfds[x]);
-
 	_port = 0;
 	_node_id = (int)-1;
 	pthread_mutex_lock(&key_list_mutex);
@@ -1019,7 +955,6 @@
 	while ((c_key = key_list) != NULL) {
 
 		while ((c_jv = c_key->kn_jvlist) != NULL) {
-			msg_close(c_jv->vn_ctx);
 			key_list->kn_jvlist = c_jv->vn_next;
 			free(c_jv);
 		}
@@ -1120,7 +1055,7 @@
 
 vf_msg_t *
 build_vf_data_message(int cmd, char *keyid, void *data, uint32_t datalen,
-		      int viewno, uint32_t *retlen)
+		      int viewno, int trans, uint32_t *retlen)
 {
 	uint32_t totallen;
 	vf_msg_t *msg;
@@ -1142,6 +1077,7 @@
 
 	/* Data */
 	strncpy(msg->vm_msg.vf_keyid,keyid,sizeof(msg->vm_msg.vf_keyid));
+	msg->vm_msg.vf_transaction = trans;
 	msg->vm_msg.vf_datalen = datalen;
 	msg->vm_msg.vf_coordinator = _node_id;
 	msg->vm_msg.vf_view = viewno;
@@ -1171,89 +1107,52 @@
 vf_write(cluster_member_list_t *membership, uint32_t flags, char *keyid,
 	 void *data, uint32_t datalen)
 {
-	int nodeid;
-	msgctx_t *peer_ctx;
-	int count;
+	msgctx_t everyone;
 	key_node_t *key_node;
 	vf_msg_t *join_view;
-	int remain = 0, x, y, rv = 1;
+	int remain = 0, x, y, rv = VFR_ERROR;
 	uint32_t totallen;
+#ifdef DEBUG
 	struct timeval start, end, dif;
-	void *lockp = NULL;
+#endif
+	struct dlm_lksb lockp;
 	int l;
 	char lock_name[256];
+	static uint32_t trans = 0;
 
 	if (!data || !datalen || !keyid || !strlen(keyid) || !membership)
 		return -1;
 
+
 	pthread_mutex_lock(&vf_mutex);
+	if (!trans) {
+		trans = _node_id << 16;
+	}
+	++trans;
+
 	/* Obtain cluster lock on it. */
 	snprintf(lock_name, sizeof(lock_name), "usrm::vf");
-	l = clu_lock_verbose(lock_name, CLK_EX, &lockp);
+	l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name);
 	if (l < 0) {
-		clu_unlock(lock_name, lockp);
+		clu_unlock(&lockp);
 		pthread_mutex_unlock(&vf_mutex);
 		return l;
 	}
 
-	/* set to -1 */
-	count = sizeof(msgctx_t) * (membership->cml_count + 1);
-	peer_ctx = malloc(count);
-	if(!peer_ctx) {
-		pthread_mutex_unlock(&vf_mutex);
-		return -1;
-	}
-
-	memset(peer_ctx, 0, sizeof(msgctx_t) * (membership->cml_count +1));
-	for (x = 0; x < (membership->cml_count + 1); x++) {
-		peer_ctx[x].type = M_NONE;
-	}
+#ifdef DEBUG
 	getuptime(&start);
+#endif
 
-retry_top:
-	/*
-	 * Connect to everyone, except ourself.  We separate this from the
-	 * initial send cycle because the connect cycle can cause timeouts
-	 * within the code - ie, if a node is down, it is likely the connect
-	 * will take longer than the client is expecting to wait for the
-	 * commit/abort messages!
-	 *
-	 * We assume we're up.  Since challenge-response needs both
-	 * processes to be operational...
-	 */
+	remain = 0;
 	for (x = 0, y = 0; x < membership->cml_count; x++) {
-		if (!memb_online(membership,
-				 membership->cml_members[x].cn_nodeid)) {
-			continue;
+		if (membership->cml_members[x].cn_member) {
+			remain++;
 		}
+	}
 
-		if (peer_fds[x].type != M_NONE)
-			continue;
-
-		nodeid = membership->cml_members[x].cn_nodeid;
-#ifdef DEBUG
-		printf("VF: Connecting to member #%d\n", (int)nodeid);
-		fflush(stdout);
-#endif
-
-		if (msg_open(nodeid, _port, &peer_ctx[y], 15) != 0) {
 #ifdef DEBUG
-			printf("VF: Connect to %d failed: %s\n", (int)nodeid,
-			       strerror(errno));
+	printf("aight, need responses from %d guys\n", remain);
 #endif
-			if (flags & VFF_RETRY)
-				goto retry_top;
-			if (flags & VFF_IGN_CONN_ERRORS)
-				continue;
-			free(peer_ctx);
-
-			clu_unlock(lock_name, lockp);
-			pthread_mutex_unlock(&vf_mutex);
-			return -1;
-		}
-
-		++y;
-	}
 
 	pthread_mutex_lock(&key_list_mutex);
 	key_node = kn_find_key(keyid);
@@ -1261,7 +1160,7 @@
 
 		if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) {
 			pthread_mutex_unlock(&key_list_mutex);
-			clu_unlock(lock_name, lockp);
+			clu_unlock(&lockp);
 			pthread_mutex_unlock(&vf_mutex);
 			return -1;
 		}
@@ -1270,19 +1169,19 @@
 	}
 
 	join_view = build_vf_data_message(VF_JOIN_VIEW, keyid, data, datalen,
-					  key_node->kn_viewno+1, &totallen);
+					  key_node->kn_viewno+1, trans, &totallen);
 
 	pthread_mutex_unlock(&key_list_mutex);
 
 	if (!join_view) {
-		clu_unlock(lock_name, lockp);
+		clu_unlock(&lockp);
 		pthread_mutex_unlock(&vf_mutex);
 		return -1;
 	}
 
 #ifdef DEBUG
-	printf("VF: Push %d.%d #%d\n", (int)_node_id, getpid(),
-	       (int)join_view->vm_msg.vf_view);
+	printf("VF: Push %d.%d #%d (X#%08x)\n", (int)_node_id, getpid(),
+	       (int)join_view->vm_msg.vf_view, trans);
 #endif
 	/* 
 	 * Encode the package.
@@ -1292,50 +1191,53 @@
 	/*
 	 * Send our message to everyone
 	 */
-	for (x = 0; peer_ctx[x].type != M_NONE; x++) {
-
-		if (msg_send(&peer_ctx[x], join_view, totallen) != totallen) {
-			vf_send_abort(peer_ctx);
-			close_all(peer_ctx);
-
-			free(join_view);
-			clu_unlock(lock_name, lockp);
-			pthread_mutex_unlock(&vf_mutex);
-			return -1;
-		} 
-
-		remain++;
+	if (msg_open(MSG_CLUSTER, 0, _port, &everyone, 0) < 0) {
+		printf("msg_open: fail: %s\n", strerror(errno));
+		return -1;
 	}
 
+	x = msg_send(&everyone, join_view, totallen);
+	if (x < totallen) {
+		vf_send_abort(&everyone, trans);
+#ifdef DEBUG
+		printf("VF: Aborted: Send failed (%d/%d)\n", x, totallen);
+#endif
+		msg_close(&everyone);
+		free(join_view);
+		clu_unlock(&lockp);
+		pthread_mutex_unlock(&vf_mutex);
+		return -1;
+	} 
+
 #ifdef DEBUG
 	printf("VF: Checking for consensus...\n");
 #endif
 	/*
 	 * See if we have a consensus =)
 	 */
-	if ((rv = (vf_unanimous(peer_ctx, remain, VF_COORD_TIMEOUT)))) {
-		vf_send_commit(peer_ctx);
+	if ((rv = (vf_unanimous(&everyone, trans, remain,
+				5)))) {
+		vf_send_commit(&everyone, trans);
+#ifdef DEBUG
+		printf("VF: Consensus reached!\n");
+#endif
 	} else {
-		vf_send_abort(peer_ctx);
+		vf_send_abort(&everyone, trans);
 #ifdef DEBUG
 		printf("VF: Aborted!\n");
 #endif
 	}
 
 	/*
-	 * Clean up
-	 */
-	close_all(peer_fds);
-
-	/*
 	 * unanimous returns 1 for true; 0 for false, so negate it and
 	 * return our value...
 	 */
+	msg_close(&everyone);
 	free(join_view);
-	free(peer_fds);
-	clu_unlock(lock_name, lockp);
+	clu_unlock(&lockp);
 	pthread_mutex_unlock(&vf_mutex);
 
+#ifdef DEBUG
 	if (rv) {
 		getuptime(&end);
 
@@ -1347,11 +1249,10 @@
 		    dif.tv_sec--;
 		}
 
-#ifdef DEBUG
 		printf("VF: Converge Time: %d.%06d\n", (int)dif.tv_sec,
 		       (int)dif.tv_usec);
-#endif
 	}
+#endif
 
 	return (rv?0:-1);
 }
@@ -1378,12 +1279,12 @@
  *			VFR_COMMIT if new views	were committed.  
  */
 static int
-_vf_purge(key_node_t *key_node, msgctx_t **ctx)
+_vf_purge(key_node_t *key_node, uint32_t *trans)
 {
 	view_node_t *cur, *dead = NULL;
 	struct timeval tv;
 
-	*fd = -1;
+	*trans = 0;
 	
 	if (!key_node)
 		return VFR_NO;
@@ -1401,11 +1302,11 @@
 		if (tv_cmp(&tv, &cur->vn_timeout) < 0)
 			continue;
 
-		*ctx = cur->vn_ctx;
-		dead = vn_remove(&key_node->kn_jvlist, *ctx);
+		*trans = cur->vn_transaction;
+		dead = vn_remove(&key_node->kn_jvlist, *trans);
 		free(dead);
 
-		printf("VF: Killed ctx %p\n", *ctx);
+		printf("VF: Killed transaction %08x\n", *trans);
 		/*
 		 * returns the removed associated file descriptor
 		 * so that we can close it and get on with life
@@ -1413,7 +1314,7 @@
 		break;
 	}
 
-	if (*fd == -1)
+	if (*trans == 0)
 		return VFR_NO;
 		
 	if (vf_resolve_views(key_node))
@@ -1425,13 +1326,13 @@
 /**
  * Process a VF message.
  *
- * @param handle	File descriptor on which msgp was received.
+ * @param nodeid	Node id from which msgp was received.
  * @param msgp		Pointer to already-received message.
  * @param nbytes	Length of msgp.
  * @return		-1 on failure, 0 on success.
  */
 int
-vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes)
+vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes)
 {
 	vf_msg_t *hdrp;
 	int ret;
@@ -1440,7 +1341,7 @@
 	    (msgp->gh_command != VF_MESSAGE))
 		return VFR_ERROR;
 
-	switch(msgp->gh_arg1) {
+	switch(vf_command(msgp->gh_arg1)) {
 	case VF_CURRENT:
 #ifdef DEBUG
 		printf("VF: Received request for current data\n");
@@ -1455,7 +1356,7 @@
 		hdrp = (vf_msg_t *)msgp;
 		swab_vf_msg_info_t(&hdrp->vm_msg);
 
-		return vf_send_current(handle, hdrp->vm_msg.vf_keyid);
+		return vf_send_current(ctx, hdrp->vm_msg.vf_keyid);
 	
 	case VF_JOIN_VIEW:
 		/* Validate size... */
@@ -1475,28 +1376,28 @@
 
 			return VFR_ERROR;
 		}
-		return vf_handle_join_view_msg(handle, hdrp);
+		return vf_handle_join_view_msg(ctx, nodeid, hdrp);
 		
 	case VF_ABORT:
-		printf("VF: Received VF_ABORT, fd%d\n", handle);
-		vf_abort(handle);
+		printf("VF: Received VF_ABORT (X#%08x)\n", msgp->gh_arg2);
+		vf_abort(msgp->gh_arg2);
 		return VFR_ABORT;
 		
 	case VF_VIEW_FORMED:
 #ifdef DEBUG
-		printf("VF: Received VF_VIEW_FORMED, fd%d\n",
-		       handle);
+		printf("VF: Received VF_VIEW_FORMED, %d\n",
+		       nodeid);
 #endif
 		pthread_mutex_lock(&key_list_mutex);
-		vf_buffer_commit(handle);
-		ret = (vf_resolve_views(kn_find_fd(handle)) ?
+		vf_buffer_commit(msgp->gh_arg2);
+		ret = (vf_resolve_views(kn_find_trans(msgp->gh_arg2)) ?
 			VFR_COMMIT : VFR_OK);
 		pthread_mutex_unlock(&key_list_mutex);
 		return ret;
-			
+
 	default:
-		printf("VF: Unknown msg type 0x%08x\n",
-		       msgp->gh_arg1);
+		/* Ignore votes and the like from this part */
+		break;
 	}
 
 	return VFR_OK;
@@ -1521,15 +1422,15 @@
 {
 	key_node_t *key_node;
 	char lock_name[256];
-	void *lockp = NULL;
+	struct dlm_lksb lockp;
 	int l;
 
 	/* Obtain cluster lock on it. */
 	pthread_mutex_lock(&vf_mutex);
 	snprintf(lock_name, sizeof(lock_name), "usrm::vf");
-	l = clu_lock_verbose(lock_name, CLK_EX, &lockp);
+	l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name);
 	if (l < 0) {
-		clu_unlock(lock_name, lockp);
+		clu_unlock(&lockp);
 		pthread_mutex_unlock(&vf_mutex);
 		printf("Couldn't lock %s\n", keyid);
 		return l;
@@ -1542,7 +1443,7 @@
 		if (!key_node) {
 			if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) {
 				pthread_mutex_unlock(&key_list_mutex);
-				clu_unlock(lock_name, lockp);
+				clu_unlock(&lockp);
 				pthread_mutex_unlock(&vf_mutex);
 				printf("Couldn't locate %s\n", keyid);
 				return VFR_ERROR;
@@ -1564,7 +1465,7 @@
 		pthread_mutex_unlock(&key_list_mutex);
 
 		if (!membership) {
-			clu_unlock(lock_name, lockp);
+			clu_unlock(&lockp);
 			//printf("Membership NULL, can't find %s\n", keyid);
 			pthread_mutex_unlock(&vf_mutex);
 			return VFR_ERROR;
@@ -1573,7 +1474,7 @@
 		l = vf_request_current(membership, keyid, view, data,
 				       datalen);
 	       	if (l == VFR_NODATA || l == VFR_ERROR) {
-			clu_unlock(lock_name, lockp);
+			clu_unlock(&lockp);
 			//printf("Requesting current failed %s %d\n", keyid, l);
 			pthread_mutex_unlock(&vf_mutex);
 			return l;
@@ -1583,7 +1484,7 @@
 	*data = malloc(key_node->kn_datalen);
 	if (! *data) {
 		pthread_mutex_unlock(&key_list_mutex);
-		clu_unlock(lock_name, lockp);
+		clu_unlock(&lockp);
 		pthread_mutex_unlock(&vf_mutex);
 		printf("Couldn't malloc %s\n", keyid);
 		return VFR_ERROR;
@@ -1594,7 +1495,7 @@
 	*view = key_node->kn_viewno;
 
 	pthread_mutex_unlock(&key_list_mutex);
-	clu_unlock(lock_name, lockp);
+	clu_unlock(&lockp);
 	pthread_mutex_unlock(&vf_mutex);
 
 	return VFR_OK;
@@ -1659,7 +1560,7 @@
 	if (!key_node || !key_node->kn_data || !key_node->kn_datalen) {
 		pthread_mutex_unlock(&key_list_mutex);
 		printf("VFT: No data for keyid %s\n", keyid);
-		return (msg_send_simple(ctx, VF_NACK, 0, 0) != -1)?
+		return (_send_simple(ctx, VF_NACK, 0, 0, 0) != -1)?
 			VFR_OK : VFR_ERROR;
 	}
 
@@ -1670,11 +1571,12 @@
 	msg = build_vf_data_message(VF_ACK, keyid, key_node->kn_data,
 				    key_node->kn_datalen,
 				    key_node->kn_viewno,
+				    0,
 				    &totallen);
 
 	pthread_mutex_unlock(&key_list_mutex);
 	if (!msg)
-		return (msg_send_simple(ctx, VFR_ERROR, 0, 0) != -1)?
+		return (_send_simple(ctx, VFR_ERROR, 0, 0, 0) != -1)?
 			VFR_OK : VFR_ERROR;
 
 	swab_vf_msg_t(msg);
@@ -1733,7 +1635,7 @@
  */
 static int
 vf_request_current(cluster_member_list_t *membership, char *keyid,
-		   int *viewno, void **data, uint32_t *datalen)
+		   uint64_t *viewno, void **data, uint32_t *datalen)
 {
 	int x, n, rv = VFR_OK, port;
 	msgctx_t ctx;
@@ -1761,8 +1663,7 @@
 	swab_vf_msg_info_t(&(msg->vm_msg));
 
 	for (x = 0; x < membership->cml_count; x++) {
-		if (!memb_online(membership,
-				 membership->cml_members[x].cn_nodeid))
+		if (!membership->cml_members[x].cn_member)
 			continue;
 
 		/* Can't request from self. */
@@ -1770,10 +1671,11 @@
 			continue;
 
 		rv = VFR_ERROR;
-		fd = msg_open(membership->cml_members[x].cn_nodeid, port,
-			      &ctx, 15);
-		if (fd == -1)
+		if (msg_open(MSG_CLUSTER,
+			     membership->cml_members[x].cn_nodeid,
+			     port, &ctx, 15) < 0) {
 			continue;
+		}
 
 		msg = &rmsg;
 		//printf("VF: Requesting current value of %s from %d\n",
/cvs/cluster/cluster/rgmanager/src/daemons/rg_event.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/daemons/rg_event.c
+++ -	2006-07-11 23:52:45.033199000 +0000
@@ -0,0 +1,103 @@
+/*
+  Copyright Red Hat, Inc. 2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
+#include <resgroup.h>
+#include <rg_locks.h>
+#include <gettid.h>
+#include <assert.h>
+#include <libcman.h>
+#include <ccs.h>
+#include <clulog.h>
+
+typedef struct __rge_q {
+	list_head();
+	char rg_name[128];
+	uint32_t rg_state;
+	int rg_owner;
+} rgevent_t;
+
+
+/**
+ * resource group event queue.
+ */
+static rgevent_t *rg_ev_queue = NULL;
+static pthread_mutex_t rg_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_t rg_ev_thread = 0;
+
+void group_event(char *name, uint32_t state, int owner);
+
+
+void *
+rg_event_thread(void *arg)
+{
+	rgevent_t *ev;
+
+	while (1) {
+		pthread_mutex_lock(&rg_queue_mutex);
+		ev = rg_ev_queue;
+		if (ev)
+			list_remove(&rg_ev_queue, ev);
+		else
+			break; /* We're outta here */
+		pthread_mutex_unlock(&rg_queue_mutex);
+
+		group_event(ev->rg_name, ev->rg_state, ev->rg_owner);
+
+		free(ev);
+	}
+
+	/* Mutex held */
+	rg_ev_thread = 0;
+	pthread_mutex_unlock(&rg_queue_mutex);
+	return NULL;
+}
+
+
+void
+rg_event_q(char *name, uint32_t state, int owner)
+{
+	rgevent_t *ev;
+	pthread_attr_t attrs;
+
+	while (1) {
+		ev = malloc(sizeof(rgevent_t));
+		if (ev) {
+			break;
+		}
+		sleep(1);
+	}
+
+	memset(ev,0,sizeof(*ev));
+
+	strncpy(ev->rg_name, name, 128);
+	ev->rg_state = state;
+	ev->rg_owner = owner;
+
+	pthread_mutex_lock (&rg_queue_mutex);
+	list_insert(&rg_ev_queue, ev);
+	if (rg_ev_thread == 0) {
+        	pthread_attr_init(&attrs);
+        	pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+        	pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+		pthread_attr_setstacksize(&attrs, 262144);
+
+		pthread_create(&rg_ev_thread, &attrs, rg_event_thread, NULL);
+        	pthread_attr_destroy(&attrs);
+	}
+	pthread_mutex_unlock (&rg_queue_mutex);
+}
--- cluster/rgmanager/src/daemons/Makefile	2006/06/02 17:37:10	1.12
+++ cluster/rgmanager/src/daemons/Makefile	2006/07/11 23:52:41	1.13
@@ -41,8 +41,8 @@
 clurgmgrd: rg_thread.o rg_locks.o main.o groups.o  \
 		rg_queue.o rg_forward.o reslist.o \
 		resrules.o restree.o fo_domain.o nodeevent.o \
-		watchdog.o rg_state.o
-	$(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman 
+		rg_event.o watchdog.o rg_state.o ../clulib/libclulib.a
+	$(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman -lpthread -ldlm
 
 #
 # Our test program links against the local allocator so that
--- cluster/rgmanager/src/daemons/fo_domain.c	2006/06/02 17:37:10	1.8
+++ cluster/rgmanager/src/daemons/fo_domain.c	2006/07/11 23:52:41	1.9
@@ -334,7 +334,7 @@
 	int found = 0;
 	int owned_by_node = 0, started = 0, no_owner = 0;
 	rg_state_t svc_state;
-	void *lockp;
+	struct dlm_lksb lockp;
 
 	ENTER();
 
@@ -414,10 +414,10 @@
 			 */
 			clulog(LOG_WARNING, "Problem getting state information for "
 			       "%s\n", rg_name);
-			rg_unlock(rg_name, lockp);
+			rg_unlock(&lockp);
 			RETURN(FOD_BEST);
 		}
-		rg_unlock(rg_name, lockp);
+		rg_unlock(&lockp);
 
 		/*
 		 * Check to see if the service is started and if we are the owner in case of
--- cluster/rgmanager/src/daemons/groups.c	2006/06/02 17:37:10	1.18
+++ cluster/rgmanager/src/daemons/groups.c	2006/07/11 23:52:41	1.19
@@ -29,9 +29,12 @@
 #include <reslist.h>
 #include <assert.h>
 
-#define cm_svccount cm_pad[0] /* Theses are uint8_t size */
-#define cm_svcexcl  cm_pad[1]
+/* Use address field in this because we never use it internally,
+   and there is no extra space in the cman_node_t type.
+   */
 
+#define cn_svccount cn_address.cna_address[0] /* Theses are uint8_t size */
+#define cn_svcexcl  cn_address.cna_address[1]
 
 static int config_version = 0;
 static resource_t *_resources = NULL;
@@ -42,6 +45,9 @@
 pthread_mutex_t config_mutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_rwlock_t resource_lock = PTHREAD_RWLOCK_INITIALIZER;
 
+void res_build_name(char *, size_t, resource_t *);
+int get_rg_state_local(char *, rg_state_t *);
+
 
 struct status_arg {
 	msgctx_t *ctx;
@@ -71,17 +77,16 @@
 int
 count_resource_groups(cluster_member_list_t *ml)
 {
-#if 0
 	resource_t *res;
 	char *rgname, *val;
 	int x;
 	rg_state_t st;
-	void *lockp;
+	struct dlm_lksb lockp;
 	cman_node_t *mp;
 
 	for (x = 0; x < ml->cml_count; x++) {
-		ml->cml_members[x].cm_svccount = 0;
-		ml->cml_members[x].cm_svcexcl = 0;
+		ml->cml_members[x].cn_svccount = 0;
+		ml->cml_members[x].cn_svcexcl = 0;
 	}
 
 	pthread_rwlock_rdlock(&resource_lock);
@@ -102,11 +107,11 @@
 		if (get_rg_state(rgname, &st) < 0) {
 			clulog(LOG_ERR, "#34: Cannot get status "
 			       "for service %s\n", rgname);
-			rg_unlock(rgname, lockp);
+			rg_unlock(&lockp);
 			continue;
 		}
 
-		rg_unlock(rgname, lockp);
+		rg_unlock(&lockp);
 
 		if (st.rs_state != RG_STATE_STARTED &&
 		     st.rs_state != RG_STATE_STARTING)
@@ -116,18 +121,17 @@
 		if (!mp)
 			continue;
 
-		++mp->cm_svccount;
+		++mp->cn_svccount;
 
 		val = res_attr_value(res, "exclusive");
 		if (val && ((!strcmp(val, "yes") ||
 				     (atoi(val)>0))) ) {
-			++mp->cm_svcexcl;
+			++mp->cn_svcexcl;
 		}
 
 	} while (!list_done(&_resources, res));
 
 	pthread_rwlock_unlock(&resource_lock);
-#endif
 	return 0;
 }
 
@@ -189,13 +193,13 @@
 
 		if (exclusive) {
 
-			if (0) {//(allowed->cml_members[x].cm_svccount > 0) {
+			if (allowed->cml_members[x].cn_svccount > 0) {
 				/* Definitely not this guy */
 				continue;
 			} else {
 				score += 2;
 			}
-		} else if (0) { //(allowed->cml_members[x].cm_svcexcl) {
+		} else if (allowed->cml_members[x].cn_svcexcl) {
 			/* This guy has an exclusive resource group.
 			   Can't relocate / failover to him. */
 			continue;
@@ -212,6 +216,42 @@
 }
 
 
+int
+check_depend(resource_t *res)
+{
+	char *val;
+	rg_state_t rs;
+
+	val = res_attr_value(res, "depend");
+	if (!val)
+		/* No dependency */
+		return -1;
+
+	if (get_rg_state_local(val, &rs) == 0)
+		return (rs.rs_state == RG_STATE_STARTED);
+
+	return 1;
+}
+
+
+int
+check_depend_safe(char *rg_name)
+{
+	resource_t *res;
+	int ret;
+
+	pthread_rwlock_rdlock(&resource_lock);
+	res = find_root_by_ref(&_resources, rg_name);
+	if (!res)
+		return -1;
+
+	ret = check_depend(res);
+	pthread_rwlock_unlock(&resource_lock);
+
+	return ret;
+}
+
+
 /**
   Start or failback a resource group: if it's not running, start it.
   If it is running and we're a better member to run it, then ask for
@@ -224,7 +264,7 @@
 	char *val;
 	cman_node_t *mp;
 	int autostart, exclusive;
-	void *lockp;
+	struct dlm_lksb lockp;
 
 	mp = memb_id_to_p(membership, my_id());
 	assert(mp);
@@ -267,7 +307,7 @@
 			if (get_rg_state(svcName, svcStatus) != 0) {
 				clulog(LOG_ERR, "#34: Cannot get status "
 				       "for service %s\n", svcName);
-				rg_unlock(svcName, lockp);
+				rg_unlock(&lockp);
 				return;
 			}
 
@@ -277,17 +317,25 @@
 				set_rg_state(svcName, svcStatus);
 			}
 
-			rg_unlock(svcName, lockp);
+			rg_unlock(&lockp);
 
 			return;
 		}
 	}
 
+	/* See if service this one depends on is running.  If not,
+           don't start it */
+	if (check_depend(node->rn_resource) == 0) {
+		clulog(LOG_DEBUG,
+		       "Skipping RG %s: Dependency missing\n", svcName);
+		return;
+	}
+
 	val = res_attr_value(node->rn_resource, "exclusive");
 	exclusive = val && ((!strcmp(val, "yes") || (atoi(val)>0)));
 
-	if (0) { //(exclusive && mp->cm_svccount) {
-		clulog(LOG_INFO,
+	if (exclusive && mp->cn_svccount) {
+		clulog(LOG_DEBUG,
 		       "Skipping RG %s: Exclusive and I am running services\n",
 		       svcName);
 		return;
@@ -297,8 +345,8 @@
 	   Don't start other services if I'm running an exclusive
 	   service.
 	 */
-	if (0) { //(mp->cm_svcexcl) {
-		clulog(LOG_INFO,
+	if (mp->cn_svcexcl) {
+		clulog(LOG_DEBUG,
 		       "Skipping RG %s: I am running an exclusive service\n",
 		       svcName);
 		return;
@@ -363,8 +411,8 @@
 int
 eval_groups(int local, uint64_t nodeid, int nodeStatus)
 {
-	void *lockp = NULL;
-	char *svcName, *nodeName;
+	struct dlm_lksb lockp;
+	char svcName[64], *nodeName;
 	resource_node_t *node;
 	rg_state_t svcStatus;
 	cluster_member_list_t *membership;
@@ -385,10 +433,7 @@
 
 	list_do(&_tree, node) {
 
-		if (node->rn_resource->r_rule->rr_root == 0)
-			continue;
-
-		svcName = node->rn_resource->r_attrs->ra_value;
+		res_build_name(svcName, sizeof(svcName), node->rn_resource);
 
 		/*
 		 * Lock the service information and get the current service
@@ -407,11 +452,11 @@
 			clulog(LOG_ERR,
 			       "#34: Cannot get status for service %s\n",
 			       svcName);
-			rg_unlock(svcName, lockp);
+			rg_unlock(&lockp);
 			continue;
 		}
 
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 
 		if (svcStatus.rs_owner == 0)
 			nodeName = "none";
@@ -461,7 +506,7 @@
 	pthread_rwlock_unlock(&resource_lock);
 	free_member_list(membership);
 
-	clulog(LOG_INFO, "Event (%d:%d:%d) Processed\n", local,
+	clulog(LOG_DEBUG, "Event (%d:%d:%d) Processed\n", local,
 	       (int)nodeid, nodeStatus);
 
 	return 0;
@@ -469,6 +514,105 @@
 
 
 /**
+ * Called to decide what services to start locally after a service event.
+ * 
+ * @see			eval_groups
+ */
+int
+group_event(char *rg_name, uint32_t state, int owner)
+{
+	char svcName[64], *nodeName;
+	resource_node_t *node;
+	rg_state_t svcStatus;
+	cluster_member_list_t *membership;
+	int depend;
+
+	if (rg_locked()) {
+		clulog(LOG_NOTICE,
+			"Resource groups locked; not evaluating\n");
+		return -EAGAIN;
+	}
+
+	membership = member_list();
+	if (!membership)
+		return -1;
+
+	pthread_rwlock_rdlock(&resource_lock);
+
+	/* Requires read lock */
+	count_resource_groups(membership);
+
+	list_do(&_tree, node) {
+
+		res_build_name(svcName, sizeof(svcName), node->rn_resource);
+
+		if (get_rg_state_local(svcName, &svcStatus) != 0)
+			continue;
+
+		if (svcStatus.rs_owner == 0)
+			nodeName = "none";
+		else
+			nodeName = memb_id_to_name(membership,
+						   svcStatus.rs_owner);
+
+		/* Disabled/failed/in recovery?  Do nothing */
+		if ((svcStatus.rs_state == RG_STATE_DISABLED) ||
+		    (svcStatus.rs_state == RG_STATE_FAILED) ||
+		    (svcStatus.rs_state == RG_STATE_RECOVER)) {
+			continue;
+		}
+
+		depend = check_depend(node->rn_resource);
+
+		/* Skip if we have no dependency */
+		if (depend == -1)
+			continue;
+
+		/*
+		   If we have:
+		   (a) a met dependency
+		   (b) we're in the STOPPED state, and
+		   (c) our new service event is a started service
+
+		   Then see if we should start this other service as well.
+		 */
+		if (depend == 1 &&
+		    svcStatus.rs_state == RG_STATE_STOPPED &&
+		    state == RG_STATE_STARTED) {
+
+			clulog(LOG_DEBUG, "Evaluating RG %s, state %s, owner "
+			       "%s\n", svcName,
+			       rg_state_str(svcStatus.rs_state),
+			       nodeName);
+			consider_start(node, svcName, &svcStatus, membership);
+			continue;
+		}
+		
+		/*
+		   If we lost a dependency for this service and it's running
+		   locally, stop it.
+		 */
+		if (depend == 0 &&
+		    svcStatus.rs_state == RG_STATE_STARTED &&
+		    svcStatus.rs_owner == my_id()) {
+
+			clulog(LOG_WARNING, "Stopping service %s: Dependency missing\n",
+			       svcName);
+			rt_enqueue_request(svcName, RG_STOP, NULL, 0, my_id(),
+					   0, 0);
+		}
+
+	} while (!list_done(&_tree, node));
+
+	pthread_rwlock_unlock(&resource_lock);
+	free_member_list(membership);
+
+	return 0;
+}
+
+
+
+/**
    Perform an operation on a resource group.  That is, walk down the
    tree for that resource group, performing the given operation on
    all children in the necessary order.
@@ -576,12 +720,11 @@
   @param rgname		Resource group name whose state we want to send.
   @see send_rg_states
  */
-int get_rg_state_local(char *, rg_state_t *);
 void
 send_rg_state(msgctx_t *ctx, char *rgname, int fast)
 {
 	rg_state_msg_t msg, *msgp = &msg;
-	void *lockp;
+	struct dlm_lksb lockp;
 
 	msgp->rsm_hdr.gh_magic = GENERIC_HDR_MAGIC;
 	msgp->rsm_hdr.gh_length = sizeof(msg);
@@ -594,10 +737,10 @@
 		if (rg_lock(rgname, &lockp) < 0)
 			return;
 		if (get_rg_state(rgname, &msgp->rsm_state) < 0) {
-			rg_unlock(rgname, lockp);
+			rg_unlock(&lockp);
 			return;
 		}
-		rg_unlock(rgname, lockp);
+		rg_unlock(&lockp);
 	}
 
 	swab_rg_state_msg_t(msgp);
@@ -616,19 +759,19 @@
 {
 	msgctx_t *ctx = ((struct status_arg *)arg)->ctx;
 	int fast = ((struct status_arg *)arg)->fast;
-	resource_t *res;
+	resource_node_t *node;
 	generic_msg_hdr hdr;
+	char rg[64];
 
 	free(arg);
 
 	pthread_rwlock_rdlock(&resource_lock);
 
-	list_do(&_resources, res) {
-		if (res->r_rule->rr_root == 0)
-			continue;
+	list_do(&_tree, node) {
 
-		send_rg_state(ctx, res->r_attrs[0].ra_value, fast);
-	} while (!list_done(&_resources, res));
+		res_build_name(rg, sizeof(rg), node->rn_resource);
+		send_rg_state(ctx, rg, fast);
+	} while (!list_done(&_tree, node));
 
 	pthread_rwlock_unlock(&resource_lock);
 
@@ -637,8 +780,8 @@
 	/* XXX wait for client to tell us it's done; I don't know why
 	   this is needed when doing fast I/O, but it is. */
 	msg_receive(ctx, &hdr, sizeof(hdr), 10);
-
 	msg_close(ctx);
+	msg_free_ctx(ctx);
 
 	return NULL;
 }
@@ -681,21 +824,20 @@
 int
 svc_exists(char *svcname)
 {
-	resource_t *res;
+	resource_node_t *node;
 	int ret = 0;
+	char rg[64];
 
 	pthread_rwlock_rdlock(&resource_lock);
 
-	list_do(&_resources, res) {
-		if (res->r_rule->rr_root == 0)
-			continue;
+	list_do(&_tree, node) {
+		res_build_name(rg, sizeof(rg), node->rn_resource);
 
-		if (strcmp(res->r_attrs[0].ra_value, 
-			   svcname) == 0) {
+		if (strcmp(rg, svcname) == 0) {
 			ret = 1;
 			break;
 		}
-	} while (!list_done(&_resources, res));
+	} while (!list_done(&_tree, node));
 
 	pthread_rwlock_unlock(&resource_lock);
 
@@ -707,25 +849,22 @@
 rg_doall(int request, int block, char *debugfmt)
 {
 	resource_node_t *curr;
-	char *name;
 	rg_state_t svcblk;
+	char rg[64];
 
 	pthread_rwlock_rdlock(&resource_lock);
 	list_do(&_tree, curr) {
 
-		if (curr->rn_resource->r_rule->rr_root == 0)
-			continue;
-
 		/* Group name */
-		name = curr->rn_resource->r_attrs->ra_value;
+		res_build_name(rg, sizeof(rg), curr->rn_resource);
 
 		if (debugfmt)
-			clulog(LOG_DEBUG, debugfmt, name);
+			clulog(LOG_DEBUG, debugfmt, rg);
 
 		/* Optimization: Don't bother even queueing the request
 		   during the exit case if we don't own it */
 		if (request == RG_STOP_EXITING) {
-			if (get_rg_state_local(name, &svcblk) < 0)
+			if (get_rg_state_local(rg, &svcblk) < 0)
 				continue;
 
 			/* Always run stop if we're the owner, regardless
@@ -734,7 +873,7 @@
 				continue;
 		}
 
-		rt_enqueue_request(name, request, NULL, 0,
+		rt_enqueue_request(rg, request, NULL, 0,
 				   0, 0, 0);
 	} while (!list_done(&_tree, curr));
 
@@ -755,21 +894,17 @@
 q_status_checks(void *arg)
 {
 	resource_node_t *curr;
-	char *name;
 	rg_state_t svcblk;
-	void *lockp;
+	char rg[64];
 
 	pthread_rwlock_rdlock(&resource_lock);
 	list_do(&_tree, curr) {
 
-		if (curr->rn_resource->r_rule->rr_root == 0)
-			continue;
-
 		/* Group name */
-		name = curr->rn_resource->r_attrs->ra_value;
+		res_build_name(rg, sizeof(rg), curr->rn_resource);
 
 		/* Local check - no one will make us take a service */
-		if (get_rg_state_local(name, &svcblk) < 0) {
+		if (get_rg_state_local(rg, &svcblk) < 0) {
 			continue;
 		}
 
@@ -777,7 +912,7 @@
 		    svcblk.rs_state != RG_STATE_STARTED)
 			continue;
 
-		rt_enqueue_request(name, RG_STATUS,
+		rt_enqueue_request(rg, RG_STATUS,
 				   NULL, 0, 0, 0, 0);
 
 	} while (!list_done(&_tree, curr));
@@ -811,24 +946,20 @@
 do_condstops(void)
 {
 	resource_node_t *curr;
-	char *name;
 	rg_state_t svcblk;
 	int need_kill;
-	void *lockp;
+	char rg[64];
 
 	clulog(LOG_INFO, "Stopping changed resources.\n");
 
 	pthread_rwlock_rdlock(&resource_lock);
 	list_do(&_tree, curr) {
 
-		if (curr->rn_resource->r_rule->rr_root == 0)
-			continue;
-
 		/* Group name */
-		name = curr->rn_resource->r_attrs->ra_value;
+		res_build_name(rg, sizeof(rg), curr->rn_resource);
 
 		/* If we're not running it, no need to CONDSTOP */
-		if (get_rg_state_local(name, &svcblk) < 0) {
+		if (get_rg_state_local(rg, &svcblk) < 0) {
 			continue;
 		}
 
@@ -839,10 +970,10 @@
 		need_kill = 0;
 		if (curr->rn_resource->r_flags & RF_NEEDSTOP) {
 			need_kill = 1;
-			clulog(LOG_DEBUG, "Removing %s\n", name);
+			clulog(LOG_DEBUG, "Removing %s\n", rg);
 		}
 
-		rt_enqueue_request(name, need_kill ? RG_DISABLE : RG_CONDSTOP,
+		rt_enqueue_request(rg, need_kill ? RG_DISABLE : RG_CONDSTOP,
 				   NULL, 0, 0, 0, 0);
 
 	} while (!list_done(&_tree, curr));
@@ -859,10 +990,10 @@
 do_condstarts(void)
 {
 	resource_node_t *curr;
-	char *name, *val;
+	char rg[64], *val;
 	rg_state_t svcblk;
 	int need_init, new_groups = 0, autostart;
-	void *lockp;
+	struct dlm_lksb lockp;
 
 	clulog(LOG_INFO, "Starting changed resources.\n");
 
@@ -870,31 +1001,26 @@
 	pthread_rwlock_rdlock(&resource_lock);
 	list_do(&_tree, curr) {
 
-		if (curr->rn_resource->r_rule->rr_root == 0)
-			continue;
-
 		/* Group name */
-		name = curr->rn_resource->r_attrs->ra_value;
+		res_build_name(rg, sizeof(rg), curr->rn_resource);
 
 		/* New RG.  We'll need to initialize it. */
 		need_init = 0;
 		if (curr->rn_resource->r_flags & RF_NEEDSTART)
 			need_init = 1;
 
-		if (get_rg_state_local(name, &svcblk) < 0) {
+		if (get_rg_state_local(rg, &svcblk) < 0)
 			continue;
-		}
 
-		if (!need_init && svcblk.rs_owner != my_id()) {
+		if (!need_init && svcblk.rs_owner != my_id())
 			continue;
-		}
 
 		if (need_init) {
 			++new_groups;
-			clulog(LOG_DEBUG, "Initializing %s\n", name);
+			clulog(LOG_DEBUG, "Initializing %s\n", rg);
 		}
 
-		rt_enqueue_request(name, need_init ? RG_INIT : RG_CONDSTART,
+		rt_enqueue_request(rg, need_init ? RG_INIT : RG_CONDSTART,
 				   NULL, 0, 0, 0, 0);
 
 	} while (!list_done(&_tree, curr));
@@ -909,21 +1035,18 @@
 	pthread_rwlock_rdlock(&resource_lock);
 	list_do(&_tree, curr) {
 
-		if (curr->rn_resource->r_rule->rr_root == 0)
-			continue;
-
 		/* Group name */
-		name = curr->rn_resource->r_attrs->ra_value;
+		res_build_name(rg, sizeof(rg), curr->rn_resource);
 
 		/* New RG.  We'll need to initialize it. */
 		if (!(curr->rn_resource->r_flags & RF_NEEDSTART))
 			continue;
 
-		if (rg_lock(name, &lockp) != 0)
+		if (rg_lock(rg, &lockp) != 0)
 			continue;
 
-		if (get_rg_state(name, &svcblk) < 0) {
-			rg_unlock(name, lockp);
+		if (get_rg_state(rg, &svcblk) < 0) {
+			rg_unlock(&lockp);
 			continue;
 		}
 
@@ -933,7 +1056,7 @@
 		   a truly new service, it will be in the UNINITIALIZED
 		   state, which will be caught by eval_groups. */
 		if (svcblk.rs_state != RG_STATE_DISABLED) {
-			rg_unlock(name, lockp);
+			rg_unlock(&lockp);
 			continue;
 		}
 
@@ -946,9 +1069,9 @@
 		else
 			svcblk.rs_state = RG_STATE_DISABLED;
 
-		set_rg_state(name, &svcblk);
+		set_rg_state(rg, &svcblk);
 
-		rg_unlock(name, lockp);
+		rg_unlock(&lockp);
 
 	} while (!list_done(&_tree, curr));
 	pthread_rwlock_unlock(&resource_lock);
--- cluster/rgmanager/src/daemons/main.c	2006/06/02 17:37:10	1.25
+++ cluster/rgmanager/src/daemons/main.c	2006/07/11 23:52:41	1.26
@@ -31,21 +31,23 @@
 #include <members.h>
 #include <msgsimple.h>
 #include <vf.h>
+#include <lock.h>
 #include <rg_queue.h>
 #include <malloc.h>
+#include <cman-private.h>
 
 #define L_SYS (1<<1)
 #define L_USER (1<<0)
 
-int configure_logging(int ccsfd);
+int configure_logging(int ccsfd, int debug);
 
+void node_event(int, uint64_t, int);
 void node_event_q(int, uint64_t, int);
 int daemon_init(char *);
 int init_resource_groups(int);
 void kill_resource_groups(void);
-void set_my_id(uint64_t);
+void set_my_id(int);
 int eval_groups(int, uint64_t, int);
-void graceful_exit(int);
 void flag_shutdown(int sig);
 void hard_exit(void);
 int send_rg_states(msgctx_t *, int);
@@ -58,6 +60,7 @@
 static int signalled = 0;
 
 uint64_t next_node_id(cluster_member_list_t *membership, uint64_t me);
+int rg_event_q(char *svcName, uint32_t state, int owner);
 
 
 void
@@ -74,49 +77,9 @@
 
 
 int
-send_exit_msg(uint64_t nodeid)
+send_exit_msg(msgctx_t *ctx)
 {
-	msgctx_t ctx;
-
-	if (msg_open(nodeid, RG_PORT, &ctx, 5) < 0) {
-		printf("Failed to send exit message\n");
-		return -1;
-	}
-	msg_send_simple(&ctx, RG_EXITING, 0, 0);
-	msg_close(&ctx);
-
-	return 0;
-}
-
-
-/**
-  Notify other resource group managers that we're leaving, since
-  cluster membership is not necessarily tied to the members running
-  the rgmgr.
- */
-int
-notify_exiting(void)
-{
-	int x;
-	uint64_t partner;
-	cluster_member_list_t *membership;
-
-	membership = member_list();
-	if (!membership)
-		return 0;
-
-	for (x = 0; x < membership->cml_count; x++) {
-
-		partner = membership->cml_members[x].cn_nodeid;
-
-		if (partner == my_id() ||
-		    !membership->cml_members[x].cn_member)
-			continue;
-
-		send_exit_msg(partner);
-	}
-
-	free_member_list(membership);
+	msg_send_simple(ctx, RG_EXITING, my_id(), 0);
 
 	return 0;
 }
@@ -130,60 +93,6 @@
 
 
 /**
-  Called to handle the transition of a cluster member from up->down or
-  down->up.  This handles initializing services (in the local node-up case),
-  exiting due to loss of quorum (local node-down), and service fail-over
-  (remote node down).
- 
-  @param nodeID		ID of the member which has come up/gone down.
-  @param nodeStatus		New state of the member in question.
-  @see eval_groups
- */
-void
-node_event(int local, uint64_t nodeID, int nodeStatus)
-{
-	if (!running)
-		return;
-
-	if (local) {
-
-		/* Local Node Event */
-		if (nodeStatus == 0)
-			hard_exit();
-
-		if (!rg_initialized()) {
-			if (init_resource_groups(0) != 0) {
-				clulog(LOG_ERR,
-				       "#36: Cannot initialize services\n");
-				hard_exit();
-			}
-		}
-
-		if (shutdown_pending) {
-			clulog(LOG_NOTICE, "Processing delayed exit signal\n");
-			graceful_exit(SIGINT);
-		}
-		setup_signal(SIGINT, graceful_exit);
-		setup_signal(SIGTERM, graceful_exit);
-		setup_signal(SIGHUP, flag_reconfigure);
-
-		eval_groups(1, nodeID, 1);
-		return;
-	}
-
-	/*
-	 * Nothing to do for events from other nodes if we are not ready.
-	 */
-	if (!rg_initialized()) {
-		clulog(LOG_DEBUG, "Services not initialized.\n");
-		return;
-	}
-
-	eval_groups(0, nodeID, nodeStatus);
-}
-
-
-/**
   This updates our local membership view and handles whether or not we
   should exit, as well as determines node transitions (thus, calling
   node_event()).
@@ -192,22 +101,43 @@
   @return			0
  */
 int
-membership_update(chandle_t *clu)
+membership_update(void)
 {
-	cluster_member_list_t *new_ml, *node_delta, *old_membership;
+	cluster_member_list_t *new_ml = NULL, *node_delta = NULL,
+			      *old_membership = NULL;
 	int		x;
 	int		me = 0;
+	cman_handle_t 	h;
+	int 		quorate;
 
-	if (!rg_quorate())
-		return 0;
+	h = cman_init(NULL);
+	quorate = cman_is_quorate(h);
+	if (!quorate) {
+		cman_finish(h);
+
+		if (!rg_quorate())
+			return -1;
+
+		clulog(LOG_EMERG, "#1: Quorum Dissolved\n");
+		rg_set_inquorate();
+		member_list_update(NULL);/* Clear member list */
+		rg_lockall(L_SYS);
+		rg_doall(RG_INIT, 1, "Emergency stop of %s");
+		rg_set_uninitialized();
+		return -1;
+	} else if (!rg_quorate()) {
 
-	clulog(LOG_INFO, "Magma Event: Membership Change\n");
+		rg_set_quorate();
+		rg_unlockall(L_SYS);
+		rg_unlockall(L_USER);
+		clulog(LOG_NOTICE, "Quorum Formed\n");
+	}
 
 	old_membership = member_list();
-	new_ml = get_member_list(clu->c_cluster);
+	new_ml = get_member_list(h);
 	member_list_update(new_ml);
+	cman_finish(h);
 
-	clulog(LOG_DEBUG, "I am node #%lld\n", my_id());
 
 	/*
 	 * Handle nodes lost.  Do our local node event first.
@@ -218,7 +148,8 @@
 	if (me) {
 		/* Should not happen */
 		clulog(LOG_INFO, "State change: LOCAL OFFLINE\n");
-		free_member_list(node_delta);
+		if (node_delta)
+			free_member_list(node_delta);
 		node_event(1, my_id(), 0);
 		/* NOTREACHED */
 	}
@@ -238,7 +169,6 @@
 		}
 	}
 
-	/* Free nodes */
 	free_member_list(node_delta);
 
 	/*
@@ -263,8 +193,7 @@
 
 		clulog(LOG_INFO, "State change: %s UP\n",
 		       node_delta->cml_members[x].cn_name);
-		node_event_q(0, node_delta->cml_members[x].cn_nodeid,
-			     1);
+		node_event_q(0, node_delta->cml_members[x].cn_nodeid, 1);
 	}
 
 	free_member_list(node_delta);
@@ -309,10 +238,61 @@
 }
 
 
-int
+#if 0
+struct lr_arg {
+	msgctx_t *ctx;
+	int req;
+};
+
+
+void *
+lockreq_th(void *a)
+{
+	int ret;
+	char state;
+	struct lr_arg *lr_arg = (struct lr_arg *)a;
+	cluster_member_list_t *m = member_list();
+
+	state = (lr_arg->req==RG_LOCK)?1:0;
+	ret = vf_write(m, VFF_IGN_CONN_ERRORS, "rg_lockdown", &state, 1);
+	free_member_list(m);
+
+	if (ret == 0) {
+		msg_send_simple(lr_arg->ctx, RG_SUCCESS, 0, 0);
+	} else {
+		msg_send_simple(lr_arg->ctx, RG_FAIL, 0, 0);
+	}
+
+	msg_close(lr_arg->ctx);
+	msg_free_ctx(lr_arg->ctx);
+	free(lr_arg);
+	return NULL;
+}
+
+
+void
+do_lockreq(msgctx_t *ctx, int req)
+{
+	pthread_t th;
+	struct lr_arg *arg;
+
+	arg = malloc(sizeof(*arg));
+	if (!arg) {
+		msg_send_simple(ctx, RG_FAIL, 0, 0);
+		msg_close(ctx);
+		msg_free_ctx(ctx);
+		return 0;
+	}
+
+	arg->ctx = ctx;
+	arg->req = req;
+
+	pthread_create(&th, NULL, lockreq_th, (void *)arg);
+}
+#else
+void 
 do_lockreq(msgctx_t *ctx, int req)
 {
-#if 0
 	int ret;
 	char state;
 	cluster_member_list_t *m = member_list();
@@ -326,9 +306,9 @@
 	} else {
 		msg_send_simple(ctx, RG_FAIL, 0, 0);
 	}
-#endif
-	return 0;
 }
+#endif
+
 
 
 /**
@@ -341,20 +321,35 @@
  * @see			quorum_msg
  */
 int
-dispatch_msg(msgctx_t *ctx, uint64_t nodeid)
+dispatch_msg(msgctx_t *ctx, int nodeid, int need_close)
 {
-	int ret = -1;
+	int ret = 0, sz = -1;
 	char msgbuf[4096];
-	generic_msg_hdr	*msg_hdr = (generic_msg_hdr *)msg_hdr;
+	generic_msg_hdr	*msg_hdr = (generic_msg_hdr *)msgbuf;
 	SmMessageSt	*msg_sm = (SmMessageSt *)msgbuf;
 
+	memset(msgbuf, 0, sizeof(msgbuf));
+
 	/* Peek-a-boo */
-	ret = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10);
-	if (ret < sizeof (generic_msg_hdr)) {
-		clulog(LOG_ERR, "#37: Error receiving message header\n");
+	sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10);
+	if (sz < sizeof (generic_msg_hdr)) {
+		clulog(LOG_ERR, "#37: Error receiving message header (%d)\n", sz);
 		goto out;
 	}
 
+	if (sz < 0)
+		return -1;
+
+	if (sz > sizeof(msgbuf)) {
+		raise(SIGSTOP);
+	}
+
+	/*
+	printf("RECEIVED %d %d %d %p\n", sz, (int)sizeof(msgbuf),
+	       (int)sizeof(generic_msg_hdr), ctx);
+	msg_print(ctx);
+	 */
+
 	/* Decode the header */
 	swab_generic_msg_hdr(msg_hdr);
 	if ((msg_hdr->gh_magic != GENERIC_HDR_MAGIC)) {
@@ -364,28 +359,23 @@
 		goto out;
 	}
 
-	if (msg_hdr->gh_length != ret) {
+	if (msg_hdr->gh_length != sz) {
 		clulog(LOG_ERR, "#XX: Read size mismatch: %d %d\n",
 		       ret, msg_hdr->gh_length);
 		goto out;
 	}
 
-	ret = 0;
 	switch (msg_hdr->gh_command) {
 	case RG_STATUS:
-		clulog(LOG_DEBUG, "Sending service states to ctx%p\n",ctx);
+		clulog(LOG_DEBUG, "Sending service states to CTX%p\n",ctx);
 		send_rg_states(ctx, msg_hdr->gh_arg1);
+		need_close = 0;
 		break;
 
 	case RG_LOCK:
-		if (rg_quorate())
-			do_lockreq(ctx, RG_LOCK);
-		msg_close(ctx);
-		break;
-
 	case RG_UNLOCK:
 		if (rg_quorate())
-			do_lockreq(ctx, RG_UNLOCK);
+			do_lockreq(ctx, msg_hdr->gh_command);
 		break;
 
 	case RG_QUERY_LOCK:
@@ -395,19 +385,18 @@
 		}
 		break;
 
-
 	case RG_ACTION_REQUEST:
 
-		if (ret != sizeof(msg_sm)) {
+		if (sz < sizeof(msg_sm)) {
 			clulog(LOG_ERR,
-			       "#39: Error receiving entire request\n");
+			       "#39: Error receiving entire request (%d/%d)\n",
+			       ret, (int)sizeof(msg_sm));
 			ret = -1;
 			goto out;
 		}
 
 		/* XXX perf: reencode header */
 		swab_generic_msg_hdr(msg_hdr);
-
 		/* Decode SmMessageSt message */
 		swab_SmMessageSt(msg_sm);
 
@@ -430,21 +419,47 @@
 		  		   ctx, 0, msg_sm->sm_data.d_svcOwner, 0, 0);
 		return 0;
 
+	case RG_EVENT:
+		/* Service event.  Run a dependency check */
+		if (sz < sizeof(msg_sm)) {
+			clulog(LOG_ERR,
+			       "#39: Error receiving entire request (%d/%d)\n",
+			       ret, (int)sizeof(msg_sm));
+			ret = -1;
+			goto out;
+		}
+
+		/* XXX perf: reencode header */
+		swab_generic_msg_hdr(msg_hdr);
+		/* Decode SmMessageSt message */
+		swab_SmMessageSt(msg_sm);
+
+		/* Send to our rg event handler */
+		rg_event_q(msg_sm->sm_data.d_svcName,
+			   msg_sm->sm_data.d_action,
+			   msg_sm->sm_data.d_svcOwner);
+		break;
+
 	case RG_EXITING:
 
 		clulog(LOG_NOTICE, "Member %d is now offline\n", (int)nodeid);
-
 		node_event(0, nodeid, 0);
 		break;
 
+	case VF_MESSAGE:
+		/* Ignore; our VF thread handles these */
+		break;
+
 	default:
 		clulog(LOG_DEBUG, "unhandled message request %d\n",
 		       msg_hdr->gh_command);
 		break;
 	}
-
 out:
-	msg_close(ctx);
+	if (need_close) {
+		msg_close(ctx);
+		msg_free_ctx(ctx);
+	}
 	return ret;
 }
 
@@ -455,47 +470,65 @@
   @return		Event
  */
 int
-handle_cluster_event(chandle_t *clu, msgctx_t *ctx)
+handle_cluster_event(msgctx_t *ctx)
 {
 	int ret;
+	msgctx_t *newctx;
+	int nodeid;
 	
 	ret = msg_wait(ctx, 0);
 
+
 	switch(ret) {
 	case M_PORTOPENED:
+		msg_receive(ctx, NULL, 0, 0);
+		clulog(LOG_DEBUG, "Event: Port Opened\n");
+		membership_update();
+		break;
 	case M_PORTCLOSED:
 		/* Might want to handle powerclosed like membership change */
+		msg_receive(ctx, NULL, 0, 0);
+		clulog(LOG_DEBUG, "Event: Port Closed\n");
+		membership_update();
+		break;
 	case M_NONE:
+		msg_receive(ctx, NULL, 0, 0);
 		clulog(LOG_DEBUG, "NULL cluster message\n");
 		break;
 	case M_OPEN:
+		newctx = msg_new_ctx();
+		if (msg_accept(ctx, newctx) >= 0 &&
+		    rg_quorate()) {
+			/* Handle message */
+			/* When request completes, the fd is closed */
+			nodeid = msg_get_nodeid(newctx);
+			dispatch_msg(newctx, nodeid, 1);
+			break;
+		}
+		break;
+
+	case M_DATA:
+		dispatch_msg(ctx, nodeid, 0);
+		break;
+		
 	case M_OPEN_ACK:
 	case M_CLOSE:
 		clulog(LOG_DEBUG, "I should NOT get here: %d\n",
 		       ret);
 		break;
 	case M_STATECHANGE:
+		msg_receive(ctx, NULL, 0, 0);
 		clulog(LOG_DEBUG, "Membership Change Event\n");
 		if (rg_quorate() && running) {
 			rg_unlockall(L_SYS);
-			membership_update(clu);
+			membership_update();
 		}
 		break;
-		rg_set_quorate();
-		rg_unlockall(L_SYS);
-		rg_unlockall(L_USER);
-		clulog(LOG_NOTICE, "Quorum Achieved\n");
-		membership_update(clu);
+	case 998:
 		break;
 	case 999:
-		clulog(LOG_EMERG, "#1: Quorum Dissolved\n");
-		rg_set_inquorate();
-		member_list_update(NULL);		/* Clear member list */
-		rg_lockall(L_SYS);
-		rg_doall(RG_INIT, 1, "Emergency stop of %s");
-		rg_set_uninitialized();
-		break;
 	case M_TRY_SHUTDOWN:
+		msg_receive(ctx, NULL, 0, 0);
 		clulog(LOG_WARNING, "#67: Shutting down uncleanly\n");
 		rg_set_inquorate();
 		rg_doall(RG_INIT, 1, "Emergency stop of %s");
@@ -512,15 +545,13 @@
 void dump_threads(void);
 
 int
-event_loop(chandle_t *clu)
+event_loop(msgctx_t *localctx, msgctx_t *clusterctx)
 {
 	int n, max, ret;
 	fd_set rfds;
-	msgctx_t newctx;
+	msgctx_t *newctx;
 	struct timeval tv;
 	int nodeid;
-	msgctx_t *localctx = clu->local_ctx;
-	msgctx_t *clusterctx = clu->cluster_ctx;
 
 	tv.tv_sec = 10;
 	tv.tv_usec = 0;
@@ -545,7 +576,8 @@
 			break;
 
 		if (msg_fd_isset(clusterctx, &rfds)) {
-			handle_cluster_event(clu, clusterctx);
+			msg_fd_clr(clusterctx, &rfds);
+			handle_cluster_event(clusterctx);
 			continue;
 		}
 
@@ -553,7 +585,9 @@
 			continue;
 		}
 
-		ret = msg_accept(localctx, &newctx);
+		msg_fd_clr(localctx, &rfds);
+		newctx = msg_new_ctx();
+		ret = msg_accept(localctx, newctx);
 
 		if (ret == -1)
 			continue;
@@ -561,25 +595,27 @@
 		if (rg_quorate()) {
 			/* Handle message */
 			/* When request completes, the fd is closed */
-			nodeid = msg_get_nodeid(&newctx);
-			dispatch_msg(&newctx, nodeid);
+			nodeid = msg_get_nodeid(newctx);
+			dispatch_msg(newctx, nodeid, 1);
 			continue;
 		}
 			
 		if (!rg_initialized()) {
-			msg_close(&newctx);
+			msg_close(newctx);
+			msg_free_ctx(newctx);
 			continue;
 		}
 
 		if (!rg_quorate()) {
 			printf("Dropping connect: NO QUORUM\n");
-			msg_close(&newctx);
+			msg_close(newctx);
+			msg_free_ctx(newctx);
 		}
 	}
 
 	if (need_reconfigure || check_config_update()) {
 		need_reconfigure = 0;
-		configure_logging(-1);
+		configure_logging(-1, 0);
 		init_resource_groups(1);
 		return 0;
 	}
@@ -606,13 +642,6 @@
 
 
 void
-graceful_exit(int sig)
-{
-	running = 0;
-}
-
-
-void
 hard_exit(void)
 {
 	rg_lockall(L_SYS);
@@ -625,12 +654,9 @@
 void
 cleanup(msgctx_t *clusterctx)
 {
-	rg_lockall(L_SYS);
-	rg_doall(RG_STOP_EXITING, 1, NULL);
-	//vf_shutdown();
 	kill_resource_groups();
 	member_list_update(NULL);
-	notify_exiting();
+	send_exit_msg(clusterctx);
 }
 
 
@@ -649,7 +675,7 @@
  * Configure logging based on data in cluster.conf
  */
 int
-configure_logging(int ccsfd)
+configure_logging(int ccsfd, int dbg)
 {
 	char *v;
 	char internal = 0;
@@ -667,7 +693,8 @@
 	}
 
 	if (ccs_get(ccsfd, "/cluster/rm/@log_level", &v) == 0) {
-		clu_set_loglevel(atoi(v));
+		if (!dbg)
+			clu_set_loglevel(atoi(v));
 		free(v);
 	}
 
@@ -679,20 +706,21 @@
 
 
 void
-clu_initialize(chandle_t *clu)
+clu_initialize(cman_handle_t **ch)
 {
-	cman_node_t me;
+	if (!ch)
+		exit(1);
 
-	clu->c_cluster = cman_init(NULL);
-	if (!clu->c_cluster) {
+	*ch = cman_init(NULL);
+	if (!(*ch)) {
 		clulog(LOG_NOTICE, "Waiting for CMAN to start\n");
 
-		while (!(clu->c_cluster = cman_init(NULL))) {
+		while (!(*ch = cman_init(NULL))) {
 			sleep(1);
 		}
 	}
 
-        if (!cman_is_quorate(clu->c_cluster)) {
+        if (!cman_is_quorate(*ch)) {
 		/*
 		   There are two ways to do this; this happens to be the simpler
 		   of the two.  The other method is to join with a NULL group 
@@ -701,14 +729,11 @@
 		 */
 		clulog(LOG_NOTICE, "Waiting for quorum to form\n");
 
-		while (cman_is_quorate(clu->c_cluster) == 0) {
+		while (cman_is_quorate(*ch) == 0) {
 			sleep(1);
 		}
 		clulog(LOG_NOTICE, "Quorum formed, starting\n");
 	}
-
-        cman_get_node(clu->c_cluster, CMAN_NODEID_US, &me);
-        clu->c_nodeid = me.cn_nodeid;
 }
 
 
@@ -722,15 +747,28 @@
 }
 
 
+void *
+shutdown_thread(void *arg)
+{
+	rg_doall(RG_STOP_EXITING, 1, NULL);
+	running = 0;
+
+	return 0;
+}
+
+
 int
 main(int argc, char **argv)
 {
 	int rv;
 	char foreground = 0;
-	int quorate;
-	int listen_fds[2], listeners;
-	int myNodeID;
-	chandle_t clu;
+	cman_node_t me;
+	msgctx_t *cluster_ctx;
+	msgctx_t *local_ctx;
+	int port = RG_PORT;
+	pthread_t th;
+
+	cman_handle_t *clu = NULL;
 
 	while ((rv = getopt(argc, argv, "fd")) != EOF) {
 		switch (rv) {
@@ -759,13 +797,25 @@
 	}
 
 	clu_initialize(&clu);
-	set_my_id(clu.c_nodeid);
+	if (cman_init_subsys(clu) < 0) {
+		perror("cman_init_subsys");
+		return -1;
+	}
+
+	if (clu_lock_init("rgmanager") != 0) {
+		printf("Locks not working!\n");
+		return -1;
+	}
+
+	memset(&me, 0, sizeof(me));
+        cman_get_node(clu, CMAN_NODEID_US, &me);
+	set_my_id(me.cn_nodeid);
 
 	/*
 	   We know we're quorate.  At this point, we need to
 	   read the resource group trees from ccsd.
 	 */
-	configure_logging(-1);
+	configure_logging(-1, debug);
 	clulog(LOG_NOTICE, "Resource Group Manager Starting\n");
 
 	if (init_resource_groups(0) != 0) {
@@ -778,46 +828,60 @@
 	setup_signal(SIGUSR1, statedump);
 	unblock_signal(SIGCHLD);
 	setup_signal(SIGPIPE, SIG_IGN);
+
 	if (debug) {
 		setup_signal(SIGSEGV, segfault);
 	} else {
 		unblock_signal(SIGSEGV);
 	}
 
-	if (msg_init(&clu) < 0) {
-		clulog(LOG_CRIT, "#10: Couldn't set up message system\n");
+	if (msg_listen(MSG_SOCKET, RGMGR_SOCK, me.cn_nodeid, &local_ctx) < 0) {
+		clulog(LOG_CRIT,
+		       "#10: Couldn't set up cluster message system: %s\n",
+		       strerror(errno));
+		return -1;
+	}
+
+	if (msg_listen(MSG_CLUSTER, &port , me.cn_nodeid, &cluster_ctx) < 0) {
+		clulog(LOG_CRIT,
+		       "#10b: Couldn't set up cluster message system: %s\n",
+		       strerror(errno));
 		return -1;
 	}
 
 	rg_set_quorate();
-	set_my_id(clu.c_nodeid);
+
+	/*
+	msg_print(local_ctx);
+	msg_print(cluster_ctx);
+	 */
 
 	/*
 	   Initialize the VF stuff.
 	 */
-#if 0
-	if (vf_init(clu.c_nodeid, RG_VF_PORT, NULL, NULL) != 0) {
+	if (vf_init(me.cn_nodeid, RG_PORT, NULL, NULL) != 0) {
 		clulog(LOG_CRIT, "#11: Couldn't set up VF listen socket\n");
 		return -1;
 	}
 
 	vf_key_init("rg_lockdown", 10, NULL, lock_commit_cb);
-#endif
-
-	/*
-	   Get an initial membership view.
-	 */
-	membership_update(&clu);
 
 	/*
 	   Do everything useful
 	 */
-	while (running)
-		event_loop(&clu);
+	while (running) {
+		event_loop(local_ctx, cluster_ctx);
+
+		if (shutdown_pending == 1) {
+			++shutdown_pending;
+			clulog(LOG_NOTICE, "Shutting down\n");
+			pthread_create(&th, NULL, shutdown_thread, NULL);
+		}
+	}
 
-	clulog(LOG_NOTICE, "Shutting down\n");
-	cleanup(&clu);
+	cleanup(cluster_ctx);
 	clulog(LOG_NOTICE, "Shutdown complete, exiting\n");
+	cman_finish(clu);
 	
 	/*malloc_dump_table(); */ /* Only works if alloc.c us used */
 	/*malloc_stats();*/
--- cluster/rgmanager/src/daemons/nodeevent.c	2006/06/02 17:37:10	1.2
+++ cluster/rgmanager/src/daemons/nodeevent.c	2006/07/11 23:52:41	1.3
@@ -20,6 +20,9 @@
 #include <rg_locks.h>
 #include <gettid.h>
 #include <assert.h>
+#include <libcman.h>
+#include <ccs.h>
+#include <clulog.h>
 
 typedef struct __ne_q {
 	list_head();
@@ -28,8 +31,6 @@
 	int ne_state;
 } nevent_t;
 
-int node_event(int, uint64_t, int);
-
 /**
  * Node event queue.
  */
@@ -38,11 +39,120 @@
 static pthread_t ne_thread = 0;
 int ne_queue_request(int local, uint64_t nodeid, int state);
 
+void hard_exit(void);
+int init_resource_groups(int);
+void flag_shutdown(int sig);
+void flag_reconfigure(int sig);
+
+extern int running;
+extern int shutdown_pending;
+
+
+/**
+  Called to handle the transition of a cluster member from up->down or
+  down->up.  This handles initializing services (in the local node-up case),
+  exiting due to loss of quorum (local node-down), and service fail-over
+  (remote node down).
+ 
+  @param nodeID		ID of the member which has come up/gone down.
+  @param nodeStatus		New state of the member in question.
+  @see eval_groups
+ */
+void
+node_event(int local, uint64_t nodeID, int nodeStatus)
+{
+	if (!running)
+		return;
+
+	if (local) {
+
+		/* Local Node Event */
+		if (nodeStatus == 0)
+			hard_exit();
+
+		if (!rg_initialized()) {
+			if (init_resource_groups(0) != 0) {
+				clulog(LOG_ERR,
+				       "#36: Cannot initialize services\n");
+				hard_exit();
+			}
+		}
+
+		if (shutdown_pending) {
+			clulog(LOG_NOTICE, "Processing delayed exit signal\n");
+			running = 0;
+		}
+		setup_signal(SIGINT, flag_shutdown);
+		setup_signal(SIGTERM, flag_shutdown);
+		setup_signal(SIGHUP, flag_reconfigure);
+
+		eval_groups(1, nodeID, 1);
+		return;
+	}
+
+	/*
+	 * Nothing to do for events from other nodes if we are not ready.
+	 */
+	if (!rg_initialized()) {
+		clulog(LOG_DEBUG, "Services not initialized.\n");
+		return;
+	}
+
+	eval_groups(0, nodeID, nodeStatus);
+}
+
+
+int
+node_has_fencing(int nodeid)
+{
+	int ccs_desc;
+	char *val = NULL;
+	char buf[1024];
+	int ret = 1;
+	
+	ccs_desc = ccs_connect();
+	if (ccs_desc < 0) {
+		clulog(LOG_ERR, "Unable to connect to ccsd; cannot handle"
+		       " node event!\n");
+		/* Assume node has fencing */
+		return 1;
+	}
+
+	snprintf(buf, sizeof(buf), 
+		 "/cluster/clusternodes/clusternode[ nodeid=\"%d\"]"
+		 "/fence/method/device/@name", nodeid);
+
+	if (ccs_get(ccs_desc, buf, &val) != 0)
+		ret = 0;
+	if (val) 
+		free(val);
+	ccs_disconnect(ccs_desc);
+	return ret;
+}
+
+
+int
+node_fenced(int nodeid)
+{
+	cman_handle_t ch;
+	int fenced = 0;
+	uint64_t fence_time;
+
+	ch = cman_init(NULL);
+	if (cman_get_fenceinfo(ch, nodeid, &fence_time, &fenced, NULL) < 0)
+		fenced = 0;
+
+	cman_finish(ch);
+
+	return fenced;
+}
+
 
 void *
 node_event_thread(void *arg)
 {
 	nevent_t *ev;
+	int notice;
 
 	while (1) {
 		pthread_mutex_lock(&ne_queue_mutex);
@@ -53,6 +163,22 @@
 			break; /* We're outta here */
 		pthread_mutex_unlock(&ne_queue_mutex);
 
+		if (ev->ne_state == 0 && node_has_fencing(ev->ne_nodeid)) {
+			notice = 0;
+			while (!node_fenced(ev->ne_nodeid)) {
+				if (!notice) {
+					notice = 1;
+					clulog(LOG_INFO, "Waiting for "
+					       "node #%d to be fenced\n",
+					       ev->ne_nodeid);
+				}
+				sleep(2);
+			}
+			if (notice)
+				clulog(LOG_INFO, "Node #%d fenced; "
+				       "continuing\n", ev->ne_nodeid);
+		}
+
 		node_event(ev->ne_local, ev->ne_nodeid, ev->ne_state);
 
 		free(ev);
@@ -60,7 +186,6 @@
 
 	/* Mutex held */
 	ne_thread = 0;
-	rg_dec_threads();
 	pthread_mutex_unlock(&ne_queue_mutex);
 	return NULL;
 }
@@ -96,8 +221,6 @@
 
 		pthread_create(&ne_thread, &attrs, node_event_thread, NULL);
         	pthread_attr_destroy(&attrs);
-
-		rg_inc_threads();
 	}
 	pthread_mutex_unlock (&ne_queue_mutex);
 }
--- cluster/rgmanager/src/daemons/reslist.c	2006/06/02 17:37:10	1.13
+++ cluster/rgmanager/src/daemons/reslist.c	2006/07/11 23:52:41	1.14
@@ -33,6 +33,13 @@
 char *attr_value(resource_node_t *node, char *attrname);
 char *rg_attr_value(resource_node_t *node, char *attrname);
 
+void
+res_build_name(char *buf, size_t buflen, resource_t *res)
+{
+	snprintf(buf, buflen, "%s:%s", res->r_rule->rr_type,
+		 res->r_attrs[0].ra_value);
+}
+
 /**
    Find and determine an attribute's value. 
 
@@ -265,11 +272,23 @@
 find_root_by_ref(resource_t **reslist, char *ref)
 {
 	resource_t *curr;
+	char ref_buf[128];
+	char *type;
+	char *name;
 	int x;
 
+	snprintf(ref_buf, sizeof(ref_buf), "%s", ref);
+
+	type = ref_buf;
+	if ((name = strchr(ref_buf, ':'))) {
+		*name = 0;
+		name++;
+	} else {
+		/* Default type */
+		type = "service";
+	}
+
 	list_do(reslist, curr) {
-		if (curr->r_rule->rr_root == 0)
-			continue;
 
 		/*
 		   This should be one operation - the primary attr
@@ -277,15 +296,18 @@
 		 */
 		for (x = 0; curr->r_attrs && curr->r_attrs[x].ra_name;
 		     x++) {
+			if (strcmp(type, curr->r_rule->rr_type))
+				continue;
 			if (!(curr->r_attrs[x].ra_flags & RA_PRIMARY))
 				continue;
-			if (strcmp(ref, curr->r_attrs[x].ra_value))
+			if (strcmp(name, curr->r_attrs[x].ra_value))
 				continue;
 
 			return curr;
 		}
 	} while (!list_done(reslist, curr));
 
+
 	return NULL;
 }
 
@@ -447,8 +469,6 @@
 	int x;
 
 	printf("Resource type: %s", res->r_rule->rr_type);
-	if (res->r_rule->rr_root)
-		printf(" [ROOT]");
 	if (res->r_flags & RF_INLINE)
 		printf(" [INLINE]");
 	if (res->r_flags & RF_NEEDSTART)
--- cluster/rgmanager/src/daemons/restree.c	2006/06/02 17:37:10	1.19
+++ cluster/rgmanager/src/daemons/restree.c	2006/07/11 23:52:41	1.20
@@ -1,5 +1,5 @@
 /*
-  Copyright Red Hat, Inc. 2004
+  Copyright Red Hat, Inc. 2004-2006
 
   This program is free software; you can redistribute it and/or modify it
   under the terms of the GNU General Public License as published by the
--- cluster/rgmanager/src/daemons/rg_forward.c	2006/06/02 17:37:10	1.4
+++ cluster/rgmanager/src/daemons/rg_forward.c	2006/07/11 23:52:41	1.5
@@ -47,24 +47,26 @@
 {
 	rg_state_t rgs;
 	request_t *req = (request_t *)arg;
-	void *lockp;
+	struct dlm_lksb lockp;
 	msgctx_t ctx;
 	SmMessageSt msg;
 
 	if (rg_lock(req->rr_group, &lockp) != 0) {
 		msg_close(req->rr_resp_ctx);
+		msg_free_ctx(req->rr_resp_ctx);
 		rq_free(req);
 		pthread_exit(NULL);
 	}
 
 	if (get_rg_state(req->rr_group, &rgs) != 0) {
-		rg_unlock(req->rr_group, lockp);
+		rg_unlock(&lockp);
 		msg_close(req->rr_resp_ctx);
+		msg_free_ctx(req->rr_resp_ctx);
 		rq_free(req);
 		pthread_exit(NULL);
 	}
 
-	rg_unlock(req->rr_group, lockp);
+	rg_unlock(&lockp);
 
 	/* Construct message */
 	build_message(&msg, req->rr_request, req->rr_group, req->rr_target);
@@ -74,8 +76,9 @@
 	       rg_req_str(req->rr_request), (int)rgs.rs_owner);
 	 */
 
-	if (msg_open(rgs.rs_owner, RG_PORT, &ctx, 10) < 0)  {
+	if (msg_open(MSG_CLUSTER, rgs.rs_owner, RG_PORT, &ctx, 10) < 0)  {
 		msg_close(req->rr_resp_ctx);
+		msg_free_ctx(req->rr_resp_ctx);
 		rq_free(req);
 		pthread_exit(NULL);
 	}
@@ -83,6 +86,7 @@
 	if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) {
 		msg_close(&ctx);
 		msg_close(req->rr_resp_ctx);
+		msg_free_ctx(req->rr_resp_ctx);
 		rq_free(req);
 		pthread_exit(NULL);
 	}
@@ -90,6 +94,7 @@
 	if (msg_receive(&ctx, &msg, sizeof(msg),10) != sizeof(msg)) {
 		msg_close(&ctx);
 		msg_close(req->rr_resp_ctx);
+		msg_free_ctx(req->rr_resp_ctx);
 		rq_free(req);
 		pthread_exit(NULL);
 	}
--- cluster/rgmanager/src/daemons/rg_state.c	2006/06/02 17:37:10	1.16
+++ cluster/rgmanager/src/daemons/rg_state.c	2006/07/11 23:52:41	1.17
@@ -26,6 +26,7 @@
 #include <string.h>
 #include <resgroup.h>
 #include <clulog.h>
+#include <lock.h>
 #include <rg_locks.h>
 #include <ccs.h>
 #include <rg_queue.h>
@@ -41,6 +42,7 @@
 int set_rg_state(char *servicename, rg_state_t *svcblk);
 int get_rg_state(char *servicename, rg_state_t *svcblk);
 void get_recovery_policy(char *rg_name, char *buf, size_t buflen);
+int check_depend_safe(char *servicename);
 
 
 uint64_t
@@ -67,11 +69,35 @@
 }
 
 
+void
+broadcast_event(char *svcName, uint32_t state)
+{
+	SmMessageSt msgp;
+	msgctx_t everyone;
+
+	msgp.sm_hdr.gh_magic = GENERIC_HDR_MAGIC;
+	msgp.sm_hdr.gh_command = RG_EVENT;
+	msgp.sm_hdr.gh_length = sizeof(msgp);
+	msgp.sm_data.d_action = state;
+	strncpy(msgp.sm_data.d_svcName, svcName,
+		sizeof(msgp.sm_data.d_svcName));
+	msgp.sm_data.d_svcOwner = 0;
+	msgp.sm_data.d_ret = 0;
+
+	swab_SmMessageSt(&msgp);
+
+	if (msg_open(MSG_CLUSTER, 0, RG_PORT, &everyone, 0) < 0)
+		return;
+
+	msg_send(&everyone, &msgp, sizeof(msgp));
+	msg_close(&everyone);
+}
+
+
 int
 svc_report_failure(char *svcName)
 {
-#if 0
-	void *lockp = NULL;
+	struct dlm_lksb lockp;
 	rg_state_t svcStatus;
 	char *nodeName;
 	cluster_member_list_t *membership;
@@ -85,10 +111,10 @@
 	if (get_rg_state(svcName, &svcStatus) != 0) {
 		clulog(LOG_ERR, "#42: Couldn't obtain status for RG %s\n",
 		       svcName);
-		clu_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return -1;
 	}
-	rg_unlock(svcName, lockp);
+	rg_unlock(&lockp);
 
 	membership = member_list();
 	nodeName = memb_id_to_name(membership, svcStatus.rs_last_owner);
@@ -107,104 +133,27 @@
 	       "#4: Administrator intervention required.\n",
 	       svcName, nodeName);
 
-#endif
 	return 0;
 }
 
 
 int
-clu_lock_verbose(char *resource, int dflt_flags, void **lockpp)
-{
-#if 0
-	int ret, timed_out = 0;
-	struct timeval start, now;
-	uint64_t nodeid, *p;
-	int flags;
-	int block = !(dflt_flags & CLK_NOWAIT);
-
-	/* Holder not supported for this call */
-	dflt_flags &= ~CLK_HOLDER;
-
-	flags = dflt_flags;
-
-	if (block) {
-		gettimeofday(&start, NULL);
-		start.tv_sec += 30;
-	}
-	while (1) {
-		if (block) {
-			gettimeofday(&now, NULL);
-
-			if ((now.tv_sec > start.tv_sec) ||
-			    ((now.tv_sec == start.tv_sec) &&
-	 		     (now.tv_usec >= start.tv_usec))) {
-
-				gettimeofday(&start, NULL);
-				start.tv_sec += 30;
-
-				timed_out = 1;
-				flags |= CLK_HOLDER;
-			}
-		}
-
-		*lockpp = NULL;
-		ret = clu_lock(resource, flags | CLK_NOWAIT, lockpp);
-
-		if ((ret != 0) && (errno == EAGAIN) && block) {
-			if (timed_out) {
-				p = (uint64_t *)*lockpp;
-				if (p) {
-					nodeid = *p;
-					clulog(LOG_WARNING, "Node ID:%08x%08x"
-					       " stuck with lock %s\n",
-					       (uint32_t)(nodeid>>32&0xffffffff),
-					       (uint32_t)nodeid&0xffffffff,
-					       resource);
-					free(p);
-				} else {
-					clulog(LOG_WARNING, "Starving for lock"
-					       " %s\n", resource);
-				}
-				flags = dflt_flags;
-				timed_out = 0;
-			}
-			usleep(random()&32767<<1);
-			continue;
-
-		} else if (ret == 0) {
-			/* Success */
-			return 0;
-		}
-
-		break;
-	}
-
-	return ret;
-#endif
-	return -1;
-}
-
-
-int
 #ifdef DEBUG
-_rg_lock(char *name, void **p)
+_rg_lock(char *name, struct dlm_lksb *p)
 #else
-rg_lock(char *name, void **p)
+rg_lock(char *name, struct dlm_lksb *p)
 #endif
 {
-#if 0
 	char res[256];
 
 	snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
-	return clu_lock_verbose(res, CLK_EX, p);
-#endif
-	return -1;
+	return clu_lock(LKM_EXMODE, p, 0, res);
 }
 
 
 #ifdef DEBUG
 int
-_rg_lock_dbg(char *name, void **p, char *file, int line)
+_rg_lock_dbg(char *name, struct dlm_lksb *p, char *file, int line)
 {
 	dprintf("rg_lock(%s) @ %s:%d\n", name, file, line);
 	return _rg_lock(name, p);
@@ -215,27 +164,21 @@
 
 int
 #ifdef DEBUG
-_rg_unlock(char *name, void *p)
+_rg_unlock(struct dlm_lksb *p)
 #else
-rg_unlock(char *name, void *p)
+rg_unlock(struct dlm_lksb *p)
 #endif
 {
-#if 0
-	char res[256];
-
-	snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
-	return clu_unlock(res, p);
-#endif
-	return -1;
+	return clu_unlock(p);
 }
 
 
 #ifdef DEBUG
 int
-_rg_unlock_dbg(char *name, void *p, char *file, int line)
+_rg_unlock_dbg(void *p, char *file, int line)
 {
-	dprintf("rg_unlock(%s) @ %s:%d\n", name, file, line);
-	return _rg_unlock(name, p);
+	dprintf("rg_unlock() @ %s:%d\n", file, line);
+	return _rg_unlock(p);
 }
 #endif
 
@@ -293,7 +236,6 @@
 int
 set_rg_state(char *name, rg_state_t *svcblk)
 {
-#if 0
 	cluster_member_list_t *membership;
 	char res[256];
 	int ret;
@@ -307,8 +249,6 @@
        		       sizeof(*svcblk));
 	free_member_list(membership);
 	return ret;
-#endif
-	return -1;
 }
 
 
@@ -329,7 +269,6 @@
 int
 get_rg_state(char *name, rg_state_t *svcblk)
 {
-#if 0
 	char res[256];
 	int ret;
 	void *data = NULL;
@@ -381,8 +320,7 @@
 	free(data);
 	free_member_list(membership);
 
-#endif
-	return -1;
+	return 0;
 }
 
 
@@ -390,7 +328,6 @@
 int
 get_rg_state_local(char *name, rg_state_t *svcblk)
 {
-#if 0
 	char res[256];
 	int ret;
 	void *data = NULL;
@@ -422,9 +359,7 @@
 	/* Copy out the data. */
 	memcpy(svcblk, data, sizeof(*svcblk));
 	free(data);
-
-#endif
-	return -1;
+	return 0;
 }
 
 
@@ -714,7 +649,7 @@
 int
 svc_start(char *svcName, int req)
 {
-	void *lockp = NULL;
+	struct dlm_lksb lockp;
 	int ret;
 	rg_state_t svcStatus;
 
@@ -725,7 +660,7 @@
 	}
 
 	if (get_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#46: Failed getting status for RG %s\n",
 		       svcName);
 		return FAIL;
@@ -734,13 +669,13 @@
 	/* LOCK HELD */
 	switch (svc_advise_start(&svcStatus, svcName, req)) {
 	case 0: /* Don't start service, return FAIL */
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return FAIL;
 	case 2: /* Don't start service, return 0 */
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return 0;
 	case 3:
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return RG_EAGAIN;
 	default:
 		break;
@@ -760,11 +695,11 @@
 	if (set_rg_state(svcName, &svcStatus) != 0) {
 		clulog(LOG_ERR,
 		       "#47: Failed changing service status\n");
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return FAIL;
 	}
 	
-	rg_unlock(svcName, lockp);
+	rg_unlock(&lockp);
 
 	ret = group_op(svcName, RG_START);
 	ret = !!ret; /* Either it worked or it didn't.  Ignore all the
@@ -780,19 +715,22 @@
 	if (set_rg_state(svcName, &svcStatus) != 0) {
 		clulog(LOG_ERR,
 		       "#75: Failed changing service status\n");
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return FAIL;
 	}
-	rg_unlock(svcName, lockp);
+	rg_unlock(&lockp);
        
-	if (ret == 0)
+	if (ret == 0) {
 		clulog(LOG_NOTICE,
 		       "Service %s started\n",
 		       svcName);
-	else
+
+		broadcast_event(svcName, RG_STATE_STARTED);
+	} else {
 		clulog(LOG_WARNING,
 		       "#68: Failed to start %s; return value: %d\n",
 		       svcName, ret);
+	}
 
 	return ret;
 }
@@ -807,7 +745,7 @@
 int
 svc_status(char *svcName)
 {
-	void *lockp = NULL;
+	struct dlm_lksb lockp;
 	rg_state_t svcStatus;
 
 	if (rg_lock(svcName, &lockp) < 0) {
@@ -817,12 +755,12 @@
 	}
 
 	if (get_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#49: Failed getting status for RG %s\n",
 		       svcName);
 		return FAIL;
 	}
-	rg_unlock(svcName, lockp);
+	rg_unlock(&lockp);
 
 	if (svcStatus.rs_owner != my_id())
 		/* Don't check status for anything not owned */
@@ -847,7 +785,7 @@
 static int
 _svc_stop(char *svcName, int req, int recover, uint32_t newstate)
 {
-	void *lockp = NULL;
+	struct dlm_lksb lockp;
 	rg_state_t svcStatus;
 	int ret;
 
@@ -864,7 +802,7 @@
 	}
 
 	if (get_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#51: Failed getting status for RG %s\n",
 		       svcName);
 		return FAIL;
@@ -872,18 +810,18 @@
 
 	switch (svc_advise_stop(&svcStatus, svcName, req)) {
 	case 0:
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_DEBUG, "Unable to stop RG %s in %s state\n",
 		       svcName, rg_state_str(svcStatus.rs_state));
 		return FAIL;
 	case 2:
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return SUCCESS;
 	case 3:
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return RG_EFORWARD;
 	case 4:
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return RG_EAGAIN;
 	default:
 		break;
@@ -900,11 +838,11 @@
 	//printf("rg state = %s\n", rg_state_str(svcStatus.rs_state));
 
 	if (set_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#52: Failed changing RG status\n");
 		return FAIL;
 	}
-	rg_unlock(svcName, lockp);
+	rg_unlock(&lockp);
 
 	ret = group_op(svcName, RG_STOP);
 
@@ -918,7 +856,7 @@
 _svc_stop_finish(char *svcName, int failed, uint32_t newstate)
 {
 	rg_state_t svcStatus;
-	void *lockp;
+	struct dlm_lksb lockp;
 
 	if (rg_lock(svcName, &lockp) == FAIL) {
 		clulog(LOG_ERR, "#53: Unable to obtain cluster lock: %s\n",
@@ -927,7 +865,7 @@
 	}
 
 	if (get_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#54: Failed getting status for RG %s\n",
 		       svcName);
 		return FAIL;
@@ -935,7 +873,7 @@
 
 	if ((svcStatus.rs_state != RG_STATE_STOPPING) &&
 	     (svcStatus.rs_state != RG_STATE_ERROR)) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		return 0;
 	}
 
@@ -945,11 +883,13 @@
 	if (failed) {
 		clulog(LOG_CRIT, "#12: RG %s failed to stop; intervention "
 		       "required\n", svcName);
-		svcStatus.rs_state = RG_STATE_FAILED;
-	} else if (svcStatus.rs_state == RG_STATE_ERROR)
+		newstate = RG_STATE_FAILED;
+	} else if (svcStatus.rs_state == RG_STATE_ERROR) {
 		svcStatus.rs_state = RG_STATE_RECOVER;
-	else
-		svcStatus.rs_state = newstate;
+		newstate = RG_STATE_RECOVER;
+	}
+
+	svcStatus.rs_state = newstate;
 
 	clulog(LOG_NOTICE, "Service %s is %s\n", svcName,
 	       rg_state_str(svcStatus.rs_state));
@@ -957,11 +897,13 @@
 
 	svcStatus.rs_transition = (uint64_t)time(NULL);
 	if (set_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#55: Failed changing RG status\n");
 		return FAIL;
 	}
-	rg_unlock(svcName, lockp);
+	rg_unlock(&lockp);
+
+	broadcast_event(svcName, newstate);
 
 	return 0;
 }
@@ -999,7 +941,7 @@
 int
 svc_fail(char *svcName)
 {
-	void *lockp = NULL;
+	struct dlm_lksb lockp;
 	rg_state_t svcStatus;
 
 	if (rg_lock(svcName, &lockp) == FAIL) {
@@ -1011,7 +953,7 @@
 	clulog(LOG_DEBUG, "Handling failure request for RG %s\n", svcName);
 
 	if (get_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#56: Failed getting status for RG %s\n",
 		       svcName);
 		return FAIL;
@@ -1019,7 +961,7 @@
 
 	if ((svcStatus.rs_state == RG_STATE_STARTED) &&
 	    (svcStatus.rs_owner != my_id())) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_DEBUG, "Unable to disable RG %s in %s state\n",
 		       svcName, rg_state_str(svcStatus.rs_state));
 		return FAIL;
@@ -1036,11 +978,13 @@
 	svcStatus.rs_transition = (uint64_t)time(NULL);
 	svcStatus.rs_restarts = 0;
 	if (set_rg_state(svcName, &svcStatus) != 0) {
-		rg_unlock(svcName, lockp);
+		rg_unlock(&lockp);
 		clulog(LOG_ERR, "#57: Failed changing RG status\n");
 		return FAIL;
 	}
-	rg_unlock(svcName, lockp);
+	rg_unlock(&lockp);
+
+	broadcast_event(svcName, RG_STATE_FAILED);
 
 	return 0;
 }
@@ -1068,7 +1012,7 @@
 
 	/* Open a connection to the other node */
 
-	if (msg_open(target, RG_PORT, &ctx, 2)< 0) {
+	if (msg_open(MSG_CLUSTER, target, RG_PORT, &ctx, 2)< 0) {
 		clulog(LOG_ERR,
 		       "#58: Failed opening connection to member #%d\n",
 		       target);
@@ -1355,6 +1299,11 @@
 		return FAIL;
 	}
 	free_member_list(membership);
+
+	/* Check for dependency.  We cannot start unless our
+	   dependency is met */
+	if (check_depend_safe(svcName) == 0)
+		return RG_EDEPEND;
 	
 	/*
 	 * This is a 'root' start request.  We need to clear out our failure
--- cluster/rgmanager/src/daemons/test.c	2005/03/21 22:00:31	1.4
+++ cluster/rgmanager/src/daemons/test.c	2006/07/11 23:52:41	1.5
@@ -1,3 +1,21 @@
+/*
+  Copyright Red Hat, Inc. 2004-2006
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge, 
+  MA 02139, USA.
+*/
 #include <libxml/parser.h>
 #include <libxml/xmlmemory.h>
 #include <libxml/xpath.h>
--- cluster/rgmanager/src/resources/service.sh	2006/06/02 17:37:10	1.5
+++ cluster/rgmanager/src/resources/service.sh	2006/07/11 23:52:41	1.6
@@ -125,6 +125,17 @@
             </shortdesc>
             <content type="string"/>
         </parameter>
+
+        <parameter name="depend">
+            <longdesc lang="en">
+		Top-level service this depends on, in "service:name"; format.
+            </longdesc>
+            <shortdesc lang="en">
+		Service dependency; will not start without the specified
+		service running.
+            </shortdesc>
+            <content type="string"/>
+        </parameter>
     </parameters>
 
     <actions>
--- cluster/rgmanager/src/utils/clustat.c	2006/06/02 17:37:11	1.17
+++ cluster/rgmanager/src/utils/clustat.c	2006/07/11 23:52:41	1.18
@@ -6,6 +6,7 @@
 #include <libgen.h>
 #include <ncurses.h>
 #include <term.h>
+#include <rg_types.h>
 #include <termios.h>
 #include <ccs.h>
 #include <libcman.h>
@@ -53,7 +54,7 @@
 
 	struct timeval tv;
 
-	if (msg_open(0, RG_PORT, &ctx, 10) < 0) {
+	if (msg_open(MSG_SOCKET, 0, 0, &ctx, 10) < 0) {
 		return NULL;
 	}
 
@@ -107,6 +108,7 @@
 		}
 
 		swab_generic_msg_hdr(msgp);
+
 		if (msgp->gh_command == RG_SUCCESS) {
 			free(msgp);
 			break;
@@ -117,7 +119,6 @@
 			return NULL;
 		}
 
-
 		rsmp = (rg_state_msg_t *)msgp;
 
 		swab_rg_state_t(&rsmp->rsm_state);
@@ -157,46 +158,63 @@
 	char buf[128];
 	char *name;
 	cluster_member_list_t *ret = NULL;
+	cman_node_t *nodes = NULL;
 
 	desc = ccs_connect();
 	if (desc < 0) {
 		return NULL;
 	}
 
-	x = 1;
+	while ((ret = malloc(sizeof(*ret))) == NULL)
+		sleep(1);
 	
-	snprintf(buf, sizeof(buf),
-		 "/cluster/clusternodes/clusternode[%d]/@name", x);
-	while (ccs_get(desc, buf, &name) == 0) {
-		if (!ret) {
-			ret = malloc(cml_size(x));
-			if (!ret) {
+	x = 1;
+	while (1) {
+		snprintf(buf, sizeof(buf),
+			"/cluster/clusternodes/clusternode[%d]/@name", x);
+
+		if (ccs_get(desc, buf, &name) != 0)
+			break;
+
+		if (!nodes) {
+			nodes = malloc(x * sizeof(cman_node_t));
+			if (!nodes ) {
 				perror("malloc");
 				ccs_disconnect(desc);
 				exit(1);
 			}
-			memset(ret, 0, cml_size(x));
+			memset(nodes, 0, x * sizeof(cman_node_t));
 		} else {
-			ret = realloc(ret, cml_size(x));
-			if (!ret) {
+			nodes = realloc(ret, x * sizeof(cman_node_t));
+			if (!nodes) {
 				perror("realloc");
 				ccs_disconnect(desc);
 				exit(1);
 			}
 		}
 
-		memset(&ret->cml_members[x-1], 0, sizeof(cman_node_t));
-		strncpy(ret->cml_members[x-1].cn_name, name,
-			sizeof(ret->cml_members[x-1].cn_name));
+		memset(&nodes[x-1], 0, sizeof(cman_node_t));
+		strncpy(nodes[x-1].cn_name, name,
+			sizeof(nodes[x-1].cn_name));
 		free(name);
 
+		/* Add node ID */
+		snprintf(buf, sizeof(buf),
+			 "/cluster/clusternodes/clusternode[%d]/@nodeid", x);
+		if (ccs_get(desc, buf, &name) == 0) {
+			nodes[x-1].cn_nodeid = atoi(name);
+			free(name);
+		}
+
 		ret->cml_count = x;
 		++x;
-		snprintf(buf, sizeof(buf),
-			 "/cluster/clusternodes/clusternode[%d]/@name", x);
 	}
 
 	ccs_disconnect(desc);
+
+	ret->cml_members = nodes;
+
+
 	return ret;
 }
 
@@ -238,9 +256,12 @@
 		if (!m) {
 			printf("%s not found\n", these->cml_members[x].cn_name);
 			/* WTF? It's not in our config */
-			printf("realloc %d\n", (int)cml_size((all->cml_count+1)));
-			all = realloc(all, cml_size((all->cml_count+1)));
-			if (!all) {
+			printf("realloc %d\n", (int)((all->cml_count+1) *
+			       sizeof(cman_node_t)));
+			all->cml_members = realloc(all->cml_members,
+						   (all->cml_count+1) *
+						   sizeof(cman_node_t));
+			if (!all->cml_members) {
 				perror("realloc");
 				exit(1);
 			}
@@ -440,7 +461,8 @@
 void
 txt_member_state(cman_node_t *node)
 {
-	printf("  %-40.40s ", node->cn_name);
+	printf("  %-34.34s %4d ", node->cn_name,
+	       node->cn_nodeid);
 
 	if (node->cn_member & FLAG_UP)
 		printf("Online");
@@ -481,8 +503,8 @@
 {
 	int x;
 
-	printf("  %-40.40s %s\n", "Member Name", "Status");
-	printf("  %-40.40s %s\n", "------ ----", "------");
+	printf("  %-34.34s %-4.4s %s\n", "Member Name", "ID", "Status");
+	printf("  %-34.34s %-4.4s %s\n", "------ ----", "----", "------");
 
 	for (x = 0; x < membership->cml_count; x++) {
 		if (name && strcmp(membership->cml_members[x].cn_name, name))
--- cluster/rgmanager/src/utils/clusvcadm.c	2006/06/02 17:37:11	1.8
+++ cluster/rgmanager/src/utils/clusvcadm.c	2006/07/11 23:52:41	1.9
@@ -72,7 +72,7 @@
 	membership = get_member_list(ch);
 	me = get_my_nodeid(ch);
 
-	if (msg_open(0, RG_PORT, &ctx, 5) < 0) {
+	if (msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5) < 0) {
 		printf("Could not connect to resource group manager\n");
 		goto out;
 	}
@@ -179,7 +179,7 @@
 main(int argc, char **argv)
 {
 	extern char *optarg;
-	char *svcname=NULL, nodename[256];
+	char *svcname=NULL, nodename[256], realsvcname[64];
 	int opt;
 	msgctx_t ctx;
 	cman_handle_t ch;
@@ -259,7 +259,12 @@
 		usage(basename(argv[0]));
 		return 1;
 	}
-	
+
+	if (!strchr(svcname,':')) {
+		snprintf(realsvcname, sizeof(realsvcname), "service:%s";,
+			 svcname);
+		svcname = realsvcname;
+	}
 
 	/* No login */
 	ch = cman_init(NULL);
@@ -287,18 +292,19 @@
 				   */
 	}
 	
+	strcpy(nodename,"me");
 	build_message(&msg, action, svcname, svctarget);
 
 	if (action != RG_RELOCATE) {
 		printf("Member %s %s %s", nodename, actionstr, svcname);
 		printf("...");
 		fflush(stdout);
-		msg_open(0, RG_PORT, &ctx, 5);
+		msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5);
 	} else {
 		printf("Trying to relocate %s to %s", svcname, nodename);
 		printf("...");
 		fflush(stdout);
-		msg_open(0, RG_PORT, &ctx, 5);
+		msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5);
 	}
 
 	if (ctx.type < 0) {
@@ -307,7 +313,9 @@
 		return 1;
 	}
 
-	if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) {
+	opt = msg_send(&ctx, &msg, sizeof(msg));
+
+	if (opt < sizeof(msg)) {
 		perror("msg_send");
 		fprintf(stderr, "Could not send entire message!\n");
 		return 1;
@@ -340,6 +348,9 @@
 	case RG_EAGAIN:
 		printf("failed: Try again (resource groups locked)\n");
 		break;
+	case RG_EDEPEND:
+		printf("failed: Operation would break dependency\n");
+		break;
 	default:
 		printf("failed: unknown reason %d\n", msg.sm_data.d_ret);
 		break;


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]