rpms/openais/devel revision-1223.patch, NONE, 1.1 revision-1230.patch, NONE, 1.1 revision-1245.patch, NONE, 1.1 openais.spec, 1.12, 1.13

fedora-cvs-commits at redhat.com fedora-cvs-commits at redhat.com
Mon Sep 25 17:34:13 UTC 2006


Author: sdake

Update of /cvs/dist/rpms/openais/devel
In directory cvs.devel.redhat.com:/tmp/cvs-serv25857/devel

Modified Files:
	openais.spec 
Added Files:
	revision-1223.patch revision-1230.patch revision-1245.patch 
Log Message:
auto-import openais-0.80.1-1.1 on branch devel from openais-0.80.1-1.1.src.rpm

revision-1223.patch:
 ckpt.c |    2 ++
 1 files changed, 2 insertions(+)

--- NEW FILE revision-1223.patch ---
Index: lib/ckpt.c
===================================================================
--- lib/ckpt.c	(revision 1222)
+++ lib/ckpt.c	(revision 1223)
@@ -1500,10 +1500,12 @@
 	for (i = 0; i < numberOfElements; i++) {
 		if (ioVector[i].dataSize == 0) {
 			*erroneousVectorIndex = i;
+                       error = SA_AIS_ERR_INVALID_PARAM;
 			goto error_put;
 		}
 		if (ioVector[i].dataBuffer == NULL) {
 			*erroneousVectorIndex = i;
+                       error = SA_AIS_ERR_INVALID_PARAM;
 			goto error_put;
 		}
 	}

revision-1230.patch:
 amfsg.c |    1 +
 1 files changed, 1 insertion(+)

--- NEW FILE revision-1230.patch ---
Index: exec/amfsg.c
===================================================================
--- exec/amfsg.c	(revision 1229)
+++ exec/amfsg.c	(revision 1230)
@@ -141,6 +141,7 @@
 
 #include <stdlib.h>
 #include <errno.h>
+#include <assert.h>
 
 #include "amf.h"
 #include "print.h"

revision-1245.patch:
 exec/Makefile     |    4 
 exec/amf.c        |    1 
 exec/amfcomp.c    |    3 
 exec/cfg.c        |    1 
 exec/ckpt.c       |    1 
 exec/clm.c        |    1 
 exec/cpg.c        |   41 ++++
 exec/evs.c        |    1 
 exec/evt.c        |    1 
 exec/flow.c       |  463 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 exec/flow.h       |   76 ++++++++
 exec/ipc.c        |  189 +++++++++++++++++++++-
 exec/ipc.h        |   21 ++
 exec/lck.c        |    1 
 exec/main.c       |    4 
 exec/main.h       |    2 
 exec/msg.c        |    1 
 exec/service.h    |    1 
 include/cpg.h     |   10 +
 include/hdb.h     |    2 
 include/ipc_cpg.h |    5 
 include/queue.h   |   12 +
 lib/cpg.c         |   25 ++
 test/Makefile     |    5 
 test/cpgbench.c   |  175 ++++++++++++++++++++
 25 files changed, 1033 insertions(+), 13 deletions(-)

--- NEW FILE revision-1245.patch ---
Index: test/cpgbench.c
===================================================================
--- test/cpgbench.c	(revision 0)
+++ test/cpgbench.c	(revision 1245)
@@ -0,0 +1,175 @@
+#define _BSD_SOURCE
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake at mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "saAis.h"
+#include "cpg.h"
+
+int alarm_notice;
+
+void cpg_bm_confchg_fn (
+	cpg_handle_t handle,
+	struct cpg_name *group_name,
+	struct cpg_address *member_list, int member_list_entries,
+	struct cpg_address *left_list, int left_list_entries,
+	struct cpg_address *joined_list, int joined_list_entries)
+{
+}
+
+unsigned int write_count;
+
+void cpg_bm_deliver_fn (
+        cpg_handle_t handle,
+        struct cpg_name *group_name,
+        uint32_t nodeid,
+        uint32_t pid,
+        void *msg,
+        int msg_len)
+{
+	write_count++;
+}
+
+cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn 	= cpg_bm_deliver_fn,
+	.cpg_confchg_fn		= cpg_bm_confchg_fn
+};
+
+char data[500000];
+
+void cpg_benchmark (
+	cpg_handle_t handle,
+	int write_size)
+{
+	struct timeval tv1, tv2, tv_elapsed;
+	struct iovec iov;
+	unsigned int res;
+	cpg_flow_control_state_t flow_control_state;
+
+	alarm_notice = 0;
+	iov.iov_base = data;
+	iov.iov_len = write_size;
+
+	write_count = 0;
+	alarm (10);
+
+	gettimeofday (&tv1, NULL);
+	do {
+		/*
+		 * Test checkpoint write
+		 */
+		cpg_flow_control_state_get (handle, &flow_control_state);
+		if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+retry:
+			res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
+			if (res == CPG_ERR_TRY_AGAIN) {
+				goto retry;
+			}
+		}
+		res = cpg_dispatch (handle, CPG_DISPATCH_ALL);
+		if (res != CPG_OK) {
+			printf ("cpg dispatch returned error %d\n", res);
+			exit (1);
+		}
+	} while (alarm_notice == 0);
+	gettimeofday (&tv2, NULL);
+	timersub (&tv2, &tv1, &tv_elapsed);
+
+	printf ("%5d messages received ", write_count);
+	printf ("%5d bytes per write ", write_size);
+	printf ("%7.3f Seconds runtime ", 
+		(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+	printf ("%9.3f TP/s ",
+		((float)write_count) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+	printf ("%7.3f MB/s.\n", 
+		((float)write_count) * ((float)write_size) /  ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
+}
+
+void sigalrm_handler (int num)
+{
+	alarm_notice = 1;
+}
+
+static struct cpg_name group_name = {
+	.value = "cpg_bm",
+	.length = 6
+};
+
+int main (void) {
+	cpg_handle_t handle;
+	unsigned int size = 1;
+	int i;
+	unsigned int res;
+	
+	signal (SIGALRM, sigalrm_handler);
+	res = cpg_initialize (&handle, &callbacks);
+	if (res != CPG_OK) {
+		printf ("cpg_initialize failed with result %d\n", res);
+		exit (1);
+	}
+	
+	res = cpg_join (handle, &group_name);
+	if (res != CPG_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+
+	for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
+		cpg_benchmark (handle, size);
+		size += 1000;
+	}
+
+	res = cpg_finalize (handle);
+	if (res != CPG_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+	return (0);
+}
Index: test/Makefile
===================================================================
--- test/Makefile	(revision 1244)
+++ test/Makefile	(revision 1245)
@@ -50,7 +50,7 @@
 	testckpt ckptstress ckptbench \
 	ckptbenchth ckpt-rd ckpt-wr testevt testevs \
 	evsbench subscription publish evtbench unlink testclm2 testlck \
-	testmsg testcpg openais-cfgtool
+	testmsg testcpg cpgbench openais-cfgtool
 
 testtimer: testtimer.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o testtimer testtimer.o ../exec/timer.o
@@ -139,6 +139,9 @@
 testcpg: testcpg.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o testcpg testcpg.o $(LIBS)
 
+cpgbench: cpgbench.o $(LIBRARIES)
+	$(CC) $(LDFLAGS) -o cpgbench cpgbench.o $(LIBS)
+
 openais-cfgtool: openais-cfgtool.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o openais-cfgtool openais-cfgtool.o $(LIBS)
 
Index: include/hdb.h
===================================================================
--- include/hdb.h	(revision 1244)
+++ include/hdb.h	(revision 1245)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2002-2006 MontaVista Software, Inc.
+ * Copyright (c) 2006 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -196,7 +197,6 @@
 			handle_database,
 			handle_database->iterator,
 			instance);
-		
 
 		handle_database->iterator += 1;
 		if (res == 0) {
Index: include/ipc_cpg.h
===================================================================
--- include/ipc_cpg.h	(revision 1244)
+++ include/ipc_cpg.h	(revision 1245)
@@ -56,7 +56,8 @@
 	MESSAGE_RES_CPG_CONFCHG_CALLBACK = 4,
 	MESSAGE_RES_CPG_DELIVER_CALLBACK = 5,
 	MESSAGE_RES_CPG_TRACKSTART = 6,
-	MESSAGE_RES_CPG_TRACKSTOP = 7
+	MESSAGE_RES_CPG_TRACKSTOP = 7,
+	MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8
 };
 
 enum lib_cpg_confchg_reason {
@@ -111,6 +112,7 @@
 	mar_uint32_t msglen __attribute__((aligned(8)));
 	mar_uint32_t nodeid __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t flow_control_state __attribute__((aligned(8)));
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
@@ -140,5 +142,4 @@
 	mar_res_header_t header __attribute__((aligned(8)));
 };
 
-
 #endif /* IPC_CPG_H_DEFINED */
Index: include/cpg.h
===================================================================
--- include/cpg.h	(revision 1244)
+++ include/cpg.h	(revision 1245)
@@ -57,6 +57,11 @@
 } cpg_guarantee_t;
 
 typedef enum {
+	CPG_FLOW_CONTROL_DISABLED,	/* flow control is disabled - new messages may be sent */
+	CPG_FLOW_CONTROL_ENABLED	/* flow control is enabled - new messages should not be sent */
+} cpg_flow_control_state_t;
+
+typedef enum {
 	CPG_OK = 1,
 	CPG_ERR_LIBRARY = 2,
 	CPG_ERR_TIMEOUT = 5,
@@ -102,7 +107,6 @@
 	void *msg,
 	int msg_len);
 
-
 typedef void (*cpg_confchg_fn_t) (
 	cpg_handle_t handle,
 	struct cpg_name *group_name,
@@ -183,4 +187,8 @@
 	struct cpg_address *member_list,
 	int *member_list_entries);
 
+cpg_error_t cpg_flow_control_state_get (
+	cpg_handle_t handle,
+	cpg_flow_control_state_t *flow_control_enabled);
+
 #endif /* OPENAIS_CPG_H_DEFINED */
Index: include/queue.h
===================================================================
--- include/queue.h	(revision 1244)
+++ include/queue.h	(revision 1245)
@@ -97,7 +97,7 @@
 	int empty;
 
 	pthread_mutex_lock (&queue->mutex);
-	empty = queue->used == 0;
+	empty = (queue->used == 0);
 	pthread_mutex_unlock (&queue->mutex);
 	return (empty);
 }
@@ -213,4 +213,14 @@
 	return (used);
 }
 
+static inline int queue_usedhw (struct queue *queue) {
+	int usedhw;
+
+	pthread_mutex_lock (&queue->mutex);
+	usedhw = queue->usedhw;
+	pthread_mutex_unlock (&queue->mutex);
+
+	return (usedhw);
+}
+
 #endif /* QUEUE_H_DEFINED */
Index: exec/cfg.c
===================================================================
--- exec/cfg.c	(revision 1244)
+++ exec/cfg.c	(revision 1245)
@@ -166,6 +166,7 @@
 	.name					= (unsigned char*)"openais configuration service",
 	.id					= CFG_SERVICE,
 	.private_data_size			= 0,
+	.flow_control				= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn				= cfg_lib_init_fn,
 	.lib_exit_fn				= cfg_lib_exit_fn,
 	.lib_service				= cfg_lib_service,
Index: exec/Makefile
===================================================================
--- exec/Makefile	(revision 1244)
+++ exec/Makefile	(revision 1245)
@@ -58,9 +58,9 @@
 LCR_OBJS = evs.o clm.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o aisparser.o vsf_ykd.o $(AMF_OBJS)
 
 # main executive objects
-MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c timer.c \
+MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c flow.c timer.c \
 	totemconfig.c mainconfig.c
-MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o timer.o \
+MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o flow.o timer.o \
 	totemconfig.o mainconfig.o ../lcr/lcr_ifact.o
 OTHER_OBJS = objdb.o
 
Index: exec/evs.c
===================================================================
--- exec/evs.c	(revision 1244)
+++ exec/evs.c	(revision 1245)
@@ -145,6 +145,7 @@
 	.name			= (unsigned char*)"openais extended virtual synchrony service",
 	.id			= EVS_SERVICE,
 	.private_data_size	= sizeof (struct evs_pd),
+	.flow_control		= OPENAIS_FLOW_CONTROL_REQUIRED, 
 	.lib_init_fn		= evs_lib_init_fn,
 	.lib_exit_fn		= evs_lib_exit_fn,
 	.lib_service		= evs_lib_service,
Index: exec/flow.c
===================================================================
--- exec/flow.c	(revision 0)
+++ exec/flow.c	(revision 1245)
@@ -0,0 +1,463 @@
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake at mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * New messages are allowed from the library ONLY when the processor has not
+ * received a OPENAIS_FLOW_CONTROL_STATE_ENABLED from any processor.  If a OPENAIS_FLOW_CONTROL_STATE_ENABLED
+ * message is sent, it must later be cancelled by a OPENAIS_FLOW_CONTROL_STATE_DISABLED
+ * message.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "flow.h"
+#include "totem.h"
+#include "totempg.h"
+#include "print.h"
+#include "hdb.h"
+#include "../include/list.h"
+
+#define OPENAIS_FLOW_CONTROL_ENABLED_SERVICES_MAX 128
+
+struct flow_control_instance {
+	struct list_head list_head;
+	unsigned int service;
+};
+
+DECLARE_LIST_INIT (flow_control_service_list_head);
+
+struct flow_control_message {
+	unsigned int service __attribute__((aligned(8)));
+	char id[1024] __attribute__((aligned(8)));
+	unsigned int id_len __attribute__((aligned(8)));
+	enum openais_flow_control_state flow_control_state __attribute__((aligned(8)));
+};
+
+struct flow_control_node_state {
+	unsigned int nodeid;
+	enum openais_flow_control_state flow_control_state;
+};
+
+struct flow_control_service {
+	struct flow_control_node_state flow_control_node_state[PROCESSOR_COUNT_MAX];
+	unsigned int service;
+	char id[1024];
+	unsigned int id_len;
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state);
+	void *context;
+	unsigned int processor_count;
+	enum openais_flow_control_state flow_control_state;
+	struct list_head list;
+	struct list_head list_all;
+};
+
+static struct totempg_group flow_control_group = {
+	.group      = "flowcontrol",
+	.group_len  = 12
+};
+
+static totempg_groups_handle flow_control_handle;
+
+static struct hdb_handle_database flow_control_hdb = {
+	.handle_count	= 0,
+	.handles	= NULL,
+	.iterator	= 0,
+	.mutex		= PTHREAD_MUTEX_INITIALIZER
+};
+
+static unsigned int flow_control_member_list[PROCESSOR_COUNT_MAX];
+static unsigned int flow_control_member_list_entries;
+
+static inline int flow_control_xmit (
+	struct flow_control_service *flow_control_service,
+	enum openais_flow_control_state flow_control_state)
+{
+	struct flow_control_message flow_control_message;
+	struct iovec iovec;
+	unsigned int res;
+
+	flow_control_message.service = flow_control_service->service;
+	flow_control_message.flow_control_state = flow_control_state;
+	memcpy (&flow_control_message.id, flow_control_service->id,
+		flow_control_service->id_len);
+	flow_control_message.id_len = flow_control_service->id_len;
+
+	iovec.iov_base = (char *)&flow_control_message;
+	iovec.iov_len = sizeof (flow_control_message);
+
+	res = totempg_groups_mcast_joined (flow_control_handle, &iovec, 1,
+		TOTEMPG_AGREED);
+
+	return (res);
+}
+
+static void flow_control_deliver_fn (
+	unsigned int nodeid,
+	struct iovec *iovec,
+	int iov_len,
+	int endian_conversion_required)
+{
+	struct flow_control_message *flow_control_message = (struct flow_control_message *)iovec[0].iov_base;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+	unsigned int i;
+
+	for (list = flow_control_service_list_head.next;
+		list != &flow_control_service_list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list_all);
+		/*
+		 * Find this nodeid in the flow control service and set the message
+		 * enabled or disabled flag
+		 */
+		for (i = 0; i < flow_control_service->processor_count; i++) {
+			if (nodeid == flow_control_service->flow_control_node_state[i].nodeid) {
+				flow_control_service->flow_control_node_state[i].flow_control_state =
+					flow_control_message->flow_control_state;
+				break;
+			}
+		}
+
+		/*
+		 * Determine if any flow control is enabled on any nodes and set
+		 * the internal variable appropriately
+		 */
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+		flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state);
+		for (i = 0; i < flow_control_service->processor_count; i++) {
+			if (flow_control_service->flow_control_node_state[i].flow_control_state == OPENAIS_FLOW_CONTROL_STATE_ENABLED) {
+				flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+				flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state);
+			}
+		}
+	} /* for list iteration */
+}
+
+static void flow_control_confchg_fn (
+	enum totem_configuration_type configuration_type,
+	unsigned int *member_list, int member_list_entries,
+	unsigned int *left_list, int left_list_entries,
+	unsigned int *joined_list, int joined_list_entries,
+	struct memb_ring_id *ring_id)
+{
+	unsigned int i;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+
+	memcpy (flow_control_member_list, member_list,
+		sizeof (unsigned int) * member_list_entries);
+	flow_control_member_list_entries = member_list_entries;
+
+	for (list = flow_control_service_list_head.next;
+		list != &flow_control_service_list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list_all);
+
+		/*
+		 * Set all of the node ids after a configuration change
+		 * Turn off all flow control after a configuration change
+		 */
+		flow_control_service->processor_count = flow_control_member_list_entries;
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+		for (i = 0; i < member_list_entries; i++) {
+			flow_control_service->flow_control_node_state[i].nodeid = member_list[i];
+			flow_control_service->flow_control_node_state[i].flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+		}
+	}
+} 
+/*
+ * External API
+ */
+unsigned int openais_flow_control_initialize (void)
+{
+	unsigned int res;
+
+	log_init ("FLOW");
+
+	res = totempg_groups_initialize (
+		&flow_control_handle,
+		flow_control_deliver_fn,
+		flow_control_confchg_fn);
+
+	if (res == -1) {
+		log_printf (LOG_LEVEL_ERROR,
+			"Couldn't initialize flow control interface.\n");
+		return (-1);
+	}
+	res = totempg_groups_join (
+		flow_control_handle,
+		&flow_control_group,
+		1);
+
+	if (res == -1) {
+		log_printf (LOG_LEVEL_ERROR, "Couldn't join flow control group.\n");
+		return (-1);
+	}
+
+	return (0);
+}
+
+unsigned int openais_flow_control_ipc_init (
+	unsigned int *flow_control_handle,
+	unsigned int service)
+{
+	struct flow_control_instance *instance;
+	unsigned int res;
+
+	res = hdb_handle_create (&flow_control_hdb,
+		sizeof (struct flow_control_instance), flow_control_handle);
+	if (res != 0) {
+		goto error_exit;
+	}
+	res = hdb_handle_get (&flow_control_hdb, *flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_destroy;
+	}
+	instance->service = service;
+
+	list_init (&instance->list_head);
+
+	return (0);
+
+error_destroy:
+	hdb_handle_destroy (&flow_control_hdb, *flow_control_handle);
+error_exit:
+	return (-1);
+
+}
+
+unsigned int openais_flow_control_ipc_exit (
+	unsigned int flow_control_handle)
+{
+	hdb_handle_destroy (&flow_control_hdb, flow_control_handle);
+	return (0);
+}
+
+unsigned int openais_flow_control_create (
+	unsigned int flow_control_handle,
+	unsigned int service,
+	void *id,
+	unsigned int id_len,
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state),
+	void *context)
+{
+	struct flow_control_service *flow_control_service;
+	struct flow_control_instance *instance;
+	unsigned int res;
+	unsigned int i;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+	flow_control_service = malloc (sizeof (struct flow_control_service));
+	if (flow_control_service == NULL) {
+		goto error_put;
+	}
+
+	/*
+	 * Add new service to flow control system
+	 */
+	memset (flow_control_service, 0, sizeof (struct flow_control_service));
+
+	flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+	flow_control_service->service = service;
+	memcpy (flow_control_service->id, id, id_len);
+	flow_control_service->id_len = id_len;
+	flow_control_service->flow_control_state_set_fn = flow_control_state_set_fn;
+	flow_control_service->context = context;
+
+	list_init (&flow_control_service->list);
+	list_add_tail (&instance->list_head,
+		&flow_control_service->list);
+
+	list_init (&flow_control_service->list_all);
+	list_add_tail (&flow_control_service_list_head,
+		&flow_control_service->list_all);
+
+	for (i = 0; i < flow_control_member_list_entries; i++) {
+		flow_control_service->flow_control_node_state[i].nodeid = flow_control_member_list[i];
+		flow_control_service->processor_count = flow_control_member_list_entries;
+	}
+error_put:
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}
+
+unsigned int openais_flow_control_destroy (
+	unsigned int flow_control_identifier,
+	unsigned int service,
+	unsigned char *id,
+	unsigned int id_len)
+{
+	struct flow_control_service *flow_control_service;
+	struct flow_control_instance *instance;
+	struct list_head *list;
+	unsigned int res;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+	for (list = flow_control_service_list_head.next;
+		list != &flow_control_service_list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list_all);
+
+		if ((flow_control_service->id_len == id_len) &&
+			(memcmp (flow_control_service->id, id, id_len) == 0)) {
+			list_del (&flow_control_service->list);
+			list_del (&flow_control_service->list_all);
+			free (flow_control_service);
+			break; /* done */
+		}
+	}
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}
+/*
+ * If 1 is returned, flow control is enabled for this service and no
+ * new messages are allowed to be sent from the library for this connection
+ */
+unsigned int openais_flow_control_enabled (
+	unsigned int flow_control_handle)
+{
+	struct flow_control_instance *instance;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+	unsigned int res = 0;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+	for (list = instance->list_head.next;
+		list != &instance->list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list);
+		if (flow_control_service->flow_control_state == OPENAIS_FLOW_CONTROL_STATE_ENABLED) {
+			res = 1;
+			break;
+		}
+	}
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}
+
+/*
+ * Disable the ability for new messages to be sent for this service
+ * with the handle id of length id_len
+ */
+unsigned int openais_flow_control_disable (
+	unsigned int flow_control_handle)
+{
+	struct flow_control_instance *instance;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+	unsigned int res;
+	unsigned int i;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+i = 0;
+	for (list = instance->list_head.next;
+		list != &instance->list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list);
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+		flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_DISABLED);
+	}
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}
+
+/*
+ * Enable the ability for new messagess to be sent for this service
+ * with the handle id of length id_len
+ */
+unsigned int openais_flow_control_enable (
+	unsigned int flow_control_handle)
+{
+	struct flow_control_instance *instance;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+	unsigned int res;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+	for (list = instance->list_head.next;
+		list != &instance->list_head;
+		list = list->next) {
+
+
+		flow_control_service = list_entry (list, struct flow_control_service, list);
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+		flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_ENABLED);
+	}
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}

Property changes on: exec/flow.c
___________________________________________________________________
Name: svn:executable
   + *

Index: exec/flow.h
===================================================================
--- exec/flow.h	(revision 0)
+++ exec/flow.h	(revision 1245)
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake at mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef FLOW_H_DEFINED
+#define FLOW_H_DEFINED
+
+enum openais_flow_control_state {
+	OPENAIS_FLOW_CONTROL_STATE_DISABLED,
+	OPENAIS_FLOW_CONTROL_STATE_ENABLED
+};
+
+unsigned int openais_flow_control_initialize (void);
+
+unsigned int openais_flow_control_ipc_init (
+	unsigned int *flow_control_identifier,
+	unsigned int service);
+
+unsigned int openais_flow_control_ipc_exit (
+	unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_create (
+	unsigned int flow_control_handle,
+	unsigned int service,
+	void *id,
+	unsigned int id_len,
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state),
+	void *context);
+
+unsigned int openais_flow_control_destroy (
+	unsigned int flow_control_identifier,
+	unsigned int service,
+	unsigned char *id,
+	unsigned int id_len);
+
+unsigned int openais_flow_control_enabled (
+	unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_disable (
+	unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_enable (
+	unsigned int flow_control_identifier);
+
+#endif /* FLOW_H_DEFINED */
Index: exec/service.h
===================================================================
--- exec/service.h	(revision 1244)
+++ exec/service.h	(revision 1245)
@@ -65,6 +65,7 @@
 	unsigned char *name;
 	unsigned short id;
 	unsigned int private_data_size;
+	enum openais_flow_control flow_control;
 	int (*lib_init_fn) (void *conn);
 	int (*lib_exit_fn) (void *conn);
 	struct openais_lib_handler *lib_service;
Index: exec/cpg.c
===================================================================
--- exec/cpg.c	(revision 1244)
+++ exec/cpg.c	(revision 1245)
@@ -66,6 +66,7 @@
 #include "jhash.h"
 #include "swab.h"
 #include "ipc.h"
+#include "flow.h"
 #include "print.h"
 
 #define GROUP_HASH_SIZE 32
@@ -102,6 +103,7 @@
 	void *conn;
 	void *trackerconn;
 	struct group_info *group;
+	enum openais_flow_control_state flow_control_state;
 	struct list_head list; /* on the group_info members list */
 };
 
@@ -237,6 +239,7 @@
 	.name				        = (unsigned char*)"openais cluster closed process group service v1.01",
 	.id					= CPG_SERVICE,
 	.private_data_size			= sizeof (struct process_info),
+	.flow_control				= OPENAIS_FLOW_CONTROL_REQUIRED,
 	.lib_init_fn				= cpg_lib_init_fn,
 	.lib_exit_fn				= cpg_lib_exit_fn,
 	.lib_service				= cpg_lib_service,
@@ -304,6 +307,8 @@
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
 	mar_uint32_t msglen __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t flow_control_state __attribute__((aligned(8)));
+	mar_message_source_t source __attribute__((aligned(8)));
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
@@ -608,6 +613,15 @@
 	}
 }
 
+static void cpg_flow_control_state_set_fn (
+	void *context,
+	enum openais_flow_control_state flow_control_state)
+{
+	struct process_info *process_info = (struct process_info *)context;
+
+	process_info->flow_control_state = flow_control_state;
+}
+
 /* Can byteswap join & leave messages */
 static void exec_cpg_procjoin_endian_convert (void *msg)
 {
@@ -640,7 +654,8 @@
 	swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
 	req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
 	req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
-
+	req_exec_cpg_mcast->flow_control_state = swab32(req_exec_cpg_mcast->flow_control_state);
+	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
 }
 
 static void do_proc_join(
@@ -787,6 +802,12 @@
 	struct group_info *gi;
 	struct list_head *iter;
 
+	/*
+	 * Track local messages so that flow is controlled on the local node
+	 */
+	if (message_source_is_local (&req_exec_cpg_mcast->source)) {
+		openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn);
+	}
 	gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */
 	assert(gi);
 
@@ -796,6 +817,7 @@
 	res_lib_cpg_mcast->msglen = msglen;
 	res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid;
 	res_lib_cpg_mcast->nodeid = nodeid;
+	res_lib_cpg_mcast->flow_control_state = 0;
 	memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
 		sizeof(mar_cpg_name_t));
 	memcpy(&res_lib_cpg_mcast->message, (char*)message+sizeof(*req_exec_cpg_mcast),
@@ -911,6 +933,14 @@
 		goto join_err;
 	}
 
+	openais_ipc_flow_control_create (
+		conn,
+		CPG_SERVICE,
+		req_lib_cpg_join->group_name.value,
+		req_lib_cpg_join->group_name.length,
+		cpg_flow_control_state_set_fn,
+		pi);
+
 	/* Add a node entry for us */
 	pi->nodeid = this_ip->nodeid;
 	pi->pid = req_lib_cpg_join->pid;
@@ -948,6 +978,12 @@
 	cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
 	pi->group = NULL;
 
+	openais_ipc_flow_control_destroy (
+		conn,
+		CPG_SERVICE,
+		gi->group_name.value,
+		gi->group_name.length);
+
 leave_ret:
 	/* send return */
 	res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
@@ -984,6 +1020,8 @@
 		MESSAGE_REQ_EXEC_CPG_MCAST);
 	req_exec_cpg_mcast.pid = pi->pid;
 	req_exec_cpg_mcast.msglen = msglen;
+	req_exec_cpg_mcast.flow_control_state = pi->flow_control_state;
+	message_source_set (&req_exec_cpg_mcast.source, conn);
 	memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name,
 		sizeof(mar_cpg_name_t));
 
@@ -994,6 +1032,7 @@
 
 	// TODO: guarantee type...
 	result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, TOTEMPG_AGREED);
+	openais_ipc_flow_control_local_increment (conn);
 
 	res.size = sizeof(res);
 	res.id = MESSAGE_RES_CPG_MCAST;
Index: exec/clm.c
===================================================================
--- exec/clm.c	(revision 1244)
+++ exec/clm.c	(revision 1245)
@@ -202,6 +202,7 @@
 	.name			= (unsigned char*)"openais cluster membership service B.01.01",
 	.id			= CLM_SERVICE,
 	.private_data_size	= sizeof (struct clm_pd),
+	.flow_control		= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn		= clm_lib_init_fn,
 	.lib_exit_fn		= clm_lib_exit_fn,
 	.lib_service		= clm_lib_service,
Index: exec/ipc.c
===================================================================
--- exec/ipc.c	(revision 1244)
+++ exec/ipc.c	(revision 1245)
@@ -67,6 +67,7 @@
 #include "totemconfig.h"
 #include "main.h"
 #include "ipc.h"
+#include "flow.h"
 #include "service.h"
 #include "sync.h"
 #include "swab.h"
@@ -80,10 +81,25 @@
 
 #define SERVER_BACKLOG 5
 
+/*
+ * When there are this many entries left in a queue, turn on flow control
+ */
+#define FLOW_CONTROL_ENTRIES_ENABLE 400
+
+/*
+ * When there are this many entries in a queue, turn off flow control
+ */
+#define FLOW_CONTROL_ENTRIES_DISABLE 64
+
+
 static unsigned int g_gid_valid = 0;
 
 static struct totem_ip_address *my_ip;
 
+static totempg_groups_handle ipc_handle;
+
+DECLARE_LIST_INIT (conn_info_list_head);
+
 static void (*ipc_serialize_lock_fn) (void);
 
 static void (*ipc_serialize_unlock_fn) (void);
@@ -117,16 +133,22 @@
 	int authenticated;	/* Is this connection authenticated? */
 	void *private_data;	/* library connection private data */
 	struct conn_info *conn_info_partner;	/* partner connection dispatch<->response */
+	unsigned int flow_control_handle;	/* flow control identifier */
+	unsigned int flow_control_enabled;	/* flow control enabled bit */
+	unsigned int flow_control_local_count;	/* flow control local count */
+	enum openais_flow_control flow_control;	/* Does this service use IPC flow control */
+	pthread_mutex_t flow_control_mutex;
         int (*lib_exit_fn) (void *conn);
 	struct timerlist timerlist;
 	pthread_mutex_t mutex;
 	pthread_mutex_t *shared_mutex;
-
+	struct list_head list;
 };
 
 static void *prioritized_poll_thread (void *conn);
 static int conn_info_outq_flush (struct conn_info *conn_info);
 static void libais_deliver (struct conn_info *conn_info);
+static void ipc_flow_control (struct conn_info *conn_info);
 
  /*
   * IPC Initializers
@@ -245,6 +267,15 @@
 	conn_info->conn_info_partner->state = CONN_STATE_ACTIVE;
 	conn_info->lib_exit_fn = ais_service[conn_info->service]->lib_exit_fn;
 	ais_service[conn_info->service]->lib_init_fn (conn_info);
+
+	conn_info->flow_control = ais_service[conn_info->service]->flow_control;
+	conn_info->conn_info_partner->flow_control = ais_service[conn_info->service]->flow_control;
+	if (ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
+		openais_flow_control_ipc_init (
+			&conn_info->flow_control_handle,
+			conn_info->service);
+
+	}
 	return (0);
 }
 
@@ -283,6 +314,7 @@
 	}
 
 	pthread_mutex_init (&conn_info->mutex, NULL);
+	pthread_mutex_init (&conn_info->flow_control_mutex, NULL);
 	pthread_mutex_init (conn_info->shared_mutex, NULL);
 
 	conn_info->state = CONN_STATE_ACTIVE;
@@ -290,6 +322,9 @@
 	conn_info->events = POLLIN|POLLNVAL;
 	conn_info->service = SOCKET_SERVICE_INIT;
 
+	list_init (&conn_info->list);
+	list_add (&conn_info_list_head, &conn_info->list);
+
 	pthread_attr_init (&conn_info->thread_attr);
 	pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
 	pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED);
@@ -316,6 +351,7 @@
 	if (conn_info->conn_info_partner) {
 		conn_info->conn_info_partner->conn_info_partner = NULL;
 	}
+	list_del (&conn_info->list);
 	free (conn_info);
 }
 
@@ -372,6 +408,9 @@
 	}
 	conn_info->state = CONN_STATE_DISCONNECTED;
 	conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTED;
+	if (conn_info->flow_control_enabled == 1) {
+		openais_flow_control_disable (conn_info->flow_control_handle);
+	}
 	return (0);
 }
 
@@ -487,6 +526,9 @@
 			if ((ufd.revents & POLLIN) == POLLIN) {
 				libais_deliver (conn_info);
 			}
+
+			ipc_flow_control (conn_info);
+
 		}
 
 		ipc_serialize_unlock_fn ();
@@ -512,6 +554,42 @@
 #endif
 
 
+static void ipc_flow_control (struct conn_info *conn_info)
+{
+	unsigned int entries_used;
+	unsigned int entries_usedhw;
+
+	entries_used = queue_used (&conn_info->outq);
+	if (conn_info->flow_control_local_count > entries_used) {
+		entries_used = conn_info->flow_control_local_count;
+	}
+	/*
+	 * IPC group-wide flow control
+	 */
+	if (conn_info->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
+		if (conn_info->flow_control_enabled == 0 &&
+			((entries_used + FLOW_CONTROL_ENTRIES_ENABLE) > SIZEQUEUE)) {
+
+			entries_usedhw = queue_usedhw (&conn_info->outq);
+			log_printf (LOG_LEVEL_NOTICE, "Enabling flow control - HW mark %d of %d %p.\n", entries_usedhw, SIZEQUEUE, &conn_info->outq);
+			openais_flow_control_enable (conn_info->flow_control_handle);
+			conn_info->flow_control_enabled = 1;
+			conn_info->conn_info_partner->flow_control_enabled = 1;
+		}
+		if (conn_info->flow_control_enabled == 1 &&
+
+			entries_used <= FLOW_CONTROL_ENTRIES_DISABLE) {
+			entries_usedhw = queue_usedhw (&conn_info->outq);
+
+			log_printf (LOG_LEVEL_NOTICE, "Disabling flow control - HW mark [%d/%d].\n",
+				entries_usedhw, SIZEQUEUE);
+			openais_flow_control_disable (conn_info->flow_control_handle);
+			conn_info->flow_control_enabled = 0;
+			conn_info->conn_info_partner->flow_control_enabled = 0;
+		}
+	}
+}
+
 static int conn_info_outq_flush (struct conn_info *conn_info) {
 	struct queue *outq;
 	int res = 0;
@@ -574,6 +652,7 @@
 	if (queue_is_empty (outq)) {
 		conn_info->events = POLLIN|POLLNVAL;
 	}
+
 	return (0);
 }
 
@@ -632,7 +711,9 @@
 
 	iov_recv.iov_base = &conn_info->inb[conn_info->inb_start];
 	iov_recv.iov_len = (SIZEINB) - conn_info->inb_start;
-	assert (iov_recv.iov_len != 0);
+	if (conn_info->inb_inuse == SIZEINB) {
+		return;
+	}
 
 retry_recv:
 	res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
@@ -714,6 +795,16 @@
 				(send_ok_joined) &&
 				(sync_in_process() == 0)));
 
+			/*
+			 * Check if flow control on new messages is enabled
+			 * for this service
+			 */
+			if ((send_ok == 1) &&
+				((ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED &&
+				openais_flow_control_enabled (conn_info->flow_control_handle) == 1)) || (conn_info->flow_control_enabled == 1) || (conn_info->conn_info_partner->flow_control_enabled == 1)) {
+				send_ok = 0;	
+			}
+
 			if (send_ok) {
 				ais_service[service]->lib_service[header->id].lib_handler_fn(conn_info, header);
 			} else {
@@ -831,6 +922,29 @@
 	source->conn = conn;
 }
 
+static void ipc_confchg_fn (
+	enum totem_configuration_type configuration_type,
+	unsigned int *member_list, int member_list_entries,
+	unsigned int *left_list, int left_list_entries,
+	unsigned int *joined_list, int joined_list_entries,
+	struct memb_ring_id *ring_id)
+{
+	struct conn_info *conn_info;
+	struct list_head *list;
+
+	/*
+	 * Turn on flow control enabled flag for all connections
+	 */
+	for (list = conn_info_list_head.next;
+		list != &conn_info_list_head;
+		list = list->next) {
+
+		conn_info = list_entry (list, struct conn_info, list);
+		conn_info->flow_control_enabled = 1;
+		conn_info->conn_info_partner->flow_control_enabled = 1;
+	}
+}
+
 void openais_ipc_init (
 	void (*serialize_lock_fn) (void),
 	void (*serialize_unlock_fn) (void),
@@ -893,6 +1007,15 @@
 	g_gid_valid = gid_valid;
 
 	my_ip = my_ip_in;
+
+	/*
+	 * Reset internal state of flow control when
+	 * configuration change occurs
+	 */
+	res = totempg_groups_initialize (
+		&ipc_handle,
+		NULL,
+		ipc_confchg_fn);
 }
 
 
@@ -947,6 +1070,9 @@
 	if (!libais_connection_active (conn_info)) {
 		return (-1);
 	}
+
+	ipc_flow_control (conn_info);
+
 	outq = &conn_info->outq;
 
 	msg_send.msg_iov = &iov_send;
@@ -1095,3 +1221,62 @@
 
 	timerlist_del (&conn_info->timerlist, timer_handle);
 }
+
+void openais_ipc_flow_control_create (
+	void *conn,
+	unsigned int service,
+	char *id,
+	int id_len,
+	void (*flow_control_state_set_fn) (void *conn, enum openais_flow_control_state),
+	void *context)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	openais_flow_control_create (
+		conn_info->flow_control_handle,
+		service,
+		id,
+		id_len,
+		flow_control_state_set_fn,
+		context);	
+	conn_info->conn_info_partner->flow_control_handle = conn_info->flow_control_handle;
+}
+
+void openais_ipc_flow_control_destroy (
+	void *conn,
+	unsigned int service,
+	char *id,
+	int id_len)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	openais_flow_control_destroy (
+		conn_info->flow_control_handle,
+		service,
+		id,
+		id_len);
+}
+
+void openais_ipc_flow_control_local_increment (
+        void *conn)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	pthread_mutex_lock (&conn_info->flow_control_mutex);
+
+	conn_info->flow_control_local_count++;
+
+	pthread_mutex_unlock (&conn_info->flow_control_mutex);
+}
+
+void openais_ipc_flow_control_local_decrement (
+        void *conn)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	pthread_mutex_lock (&conn_info->flow_control_mutex);
+
+	conn_info->flow_control_local_count--;
+
+	pthread_mutex_unlock (&conn_info->flow_control_mutex);
+}
Index: exec/ipc.h
===================================================================
--- exec/ipc.h	(revision 1244)
+++ exec/ipc.h	(revision 1245)
@@ -36,6 +36,7 @@
 #define IPC_H_DEFINED
 
 #include "tlist.h"
+#include "flow.h"
 
 extern void message_source_set (mar_message_source_t *source, void *conn);
 
@@ -68,4 +69,24 @@
 	void *conn,
 	timer_handle timer_handle);
 
+extern void openais_ipc_flow_control_create (
+	void *conn,
+	unsigned int service,
+	char *id,
+	int id_len,
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state_set),
+	void *context);
+	
+extern void openais_ipc_flow_control_destroy (
+	void *conn,
+	unsigned int service,
+	char *id,
+	int id_len);
+
+extern void openais_ipc_flow_control_local_increment (
+	void *conn);
+
+extern void openais_ipc_flow_control_local_decrement (
+	void *conn);
+
 #endif /* IPC_H_DEFINED */
Index: exec/amfcomp.c
===================================================================
--- exec/amfcomp.c	(revision 1244)
+++ exec/amfcomp.c	(revision 1245)
@@ -1411,6 +1411,7 @@
 			assert (0);
 			break;
 	}
+	return 0;
 }
 
 /**
@@ -1687,4 +1688,6 @@
 	}
 
 	assert (0);
+	/* XXX we fall here in case NDEBUG is set */
+	return -1;
 }
Index: exec/evt.c
===================================================================
--- exec/evt.c	(revision 1244)
+++ exec/evt.c	(revision 1245)
@@ -215,6 +215,7 @@
 								(unsigned char*)"openais event service B.01.01",
 	.id							= EVT_SERVICE,
 	.private_data_size			= sizeof (struct libevt_pd),
+	.flow_control				= OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
 	.lib_init_fn				= evt_lib_init,
 	.lib_exit_fn				= evt_lib_exit,
 	.lib_service				= evt_lib_service,
Index: exec/ckpt.c
===================================================================
--- exec/ckpt.c	(revision 1244)
+++ exec/ckpt.c	(revision 1245)
@@ -545,6 +545,7 @@
 	.name				= (unsigned char *)"openais checkpoint service B.01.01",
 	.id				= CKPT_SERVICE,
 	.private_data_size		= sizeof (struct ckpt_pd),
+	.flow_control			= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn			= ckpt_lib_init_fn,
 	.lib_exit_fn			= ckpt_lib_exit_fn,
 	.lib_service			= ckpt_lib_service,
Index: exec/amf.c
===================================================================
--- exec/amf.c	(revision 1244)
+++ exec/amf.c	(revision 1245)
@@ -266,6 +266,7 @@
 	.name				= (unsigned char *)"openais availability management framework B.01.01",
 	.id					= AMF_SERVICE,
 	.private_data_size	= sizeof (struct amf_pd),
+	.flow_control		= OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
 	.lib_init_fn		= amf_lib_init_fn,
 	.lib_exit_fn		= amf_lib_exit_fn,
 	.lib_service		= amf_lib_service,
Index: exec/lck.c
===================================================================
--- exec/lck.c	(revision 1244)
+++ exec/lck.c	(revision 1245)
@@ -283,6 +283,7 @@
 	.name				= (unsigned char*)"openais distributed locking service B.01.01",
 	.id				= LCK_SERVICE,
 	.private_data_size		= sizeof (struct lck_pd),
+	.flow_control			= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn			= lck_lib_init_fn,
 	.lib_exit_fn			= lck_lib_exit_fn,
 	.lib_service			= lck_lib_service,
Index: exec/main.c
===================================================================
--- exec/main.c	(revision 1244)
+++ exec/main.c	(revision 1245)
@@ -75,6 +75,7 @@
 #include "timer.h"
 #include "print.h"
 #include "util.h"
+#include "flow.h"
 #include "version.h"
 
 #define SERVER_BACKLOG 5
@@ -538,6 +539,9 @@
 	sync_register (openais_sync_callbacks_retrieve, openais_sync_completed,
 		totem_config.vsf_type);
 
+
+	res = openais_flow_control_initialize ();
+
 	/*
 	 * Drop root privleges to user 'ais'
 	 * TODO: Don't really need full root capabilities;
Index: exec/msg.c
===================================================================
--- exec/msg.c	(revision 1244)
+++ exec/msg.c	(revision 1245)
@@ -436,6 +436,7 @@
 	.name				= (unsigned char *)"openais message service B.01.01",
 	.id				= MSG_SERVICE,
 	.private_data_size		= sizeof (struct msg_pd),
+	.flow_control			= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn			= msg_lib_init_fn,
 	.lib_exit_fn			= msg_lib_exit_fn,
 	.lib_service			= msg_lib_service,
Index: exec/main.h
===================================================================
--- exec/main.h	(revision 1244)
+++ exec/main.h	(revision 1245)
@@ -46,7 +46,7 @@
  * Size of the queue (entries) for I/O's to the API over socket IPC.
  */
 
-#define SIZEQUEUE 256
+#define SIZEQUEUE 800
 
 #define SOCKET_SERVICE_INIT 254
 
Index: lib/cpg.c
===================================================================
--- lib/cpg.c	(revision 1244)
+++ lib/cpg.c	(revision 1245)
@@ -55,6 +55,7 @@
 	int response_fd;
 	int dispatch_fd;
 	int finalize;
+	cpg_flow_control_state_t flow_control_state;
 	cpg_callbacks_t callbacks;
 	pthread_mutex_t response_mutex;
 	pthread_mutex_t dispatch_mutex;
@@ -306,6 +307,7 @@
 		case MESSAGE_RES_CPG_DELIVER_CALLBACK:
 			res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
 
+			cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state;
 			marshall_from_mar_cpg_name_t (
 				&group_name,
 				&res_cpg_deliver_callback->group_name);
@@ -352,7 +354,6 @@
 				res_cpg_confchg_callback->joined_list_entries);
 			break;
 
-
 		default:
 			error = SA_AIS_ERR_LIBRARY;
 			goto error_nounlock;
@@ -536,6 +537,10 @@
 		goto error_exit;
 	}
 
+	cpg_inst->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
+	if (res_lib_cpg_mcast.error == CPG_ERR_TRY_AGAIN) {
+		cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
+	}
 	error = res_lib_cpg_mcast.error;
 
 error_exit:
@@ -600,4 +605,22 @@
 	return (error);
 }
 
+cpg_error_t cpg_flow_control_state_get (
+	cpg_handle_t handle,
+	cpg_flow_control_state_t *flow_control_state)
+{
+	cpg_error_t error;
+	struct cpg_inst *cpg_inst;
+
+	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+	if (error != SA_AIS_OK) {
+		return (error);
+	}
+
+	*flow_control_state = cpg_inst->flow_control_state;
+
+	saHandleInstancePut (&cpg_handle_t_db, handle);
+
+	return (error);
+}
 /** @} */


Index: openais.spec
===================================================================
RCS file: /cvs/dist/rpms/openais/devel/openais.spec,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- openais.spec	16 Aug 2006 04:45:44 -0000	1.12
+++ openais.spec	25 Sep 2006 17:34:06 -0000	1.13
@@ -1,12 +1,15 @@
 Name: openais
 Summary: The openais Standards-Based Cluster Framework executive and APIs
 Version: 0.80.1
-Release: 1.0
+Release: 1.1
 License: BSD
 Group: System Environment/Base
 URL: http://developer.osdl.org/dev/openais/
 Source0: http://developer.osdl.org/dev/openais/downloads/openais-%{version}/openais-%{version}.tar.gz
 Patch0: openais-0.76-defaultconfig.patch
+Patch1: revision-1223.patch
+Patch2: revision-1230.patch
+Patch3: revision-1245.patch
 BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
 ExclusiveArch: i386 ppc x86_64 ppc64 ia64 s390 s390x
 Requires(pre): /usr/sbin/useradd
@@ -30,10 +33,16 @@
 
 %prep
 %setup -q -n openais-%{version}
+%patch0
+%patch1
+%patch2 
+%patch3
 
 %build
 # -O3 required for performance reasons
-CFLAGS="$(echo '%{optflags}' | sed -e 's/-O[0-9]*//') -O3"
+# So we get proper debug output, for now we don't compile with O3
+#CFLAGS="$(echo '%{optflags}' | sed -e 's/-O[0-9]*//') -O3"
+CFLAGS="$(echo '%{optflags}')"
 make CFLAGS="$CFLAGS"
 
 %install
@@ -56,6 +65,7 @@
 
 %post
 /sbin/chkconfig --add openais || :
+/sbin/ldconfig > /dev/null
 
 %preun
 if [ $1 -eq 0 ]; then
@@ -65,10 +75,7 @@
 
 %postun
 [ "$1" -ge "1" ] && %{_initrddir}/openais condrestart &>/dev/null || :
-
-%post devel -p /sbin/ldconfig
-
-%postun devel -p /sbin/ldconfig
+/sbin/ldconfig > /dev/null
 
 %files 
 %defattr(-,root,root,-)
@@ -163,6 +170,13 @@
 %{_mandir}/man3/evs_membership_get.3*
 
 %changelog
+* Mon Sep 25 2006 Steven Dake <sdake at redhat.com> - 0.80.1-1.1
+- Add upstream revision 1223 - Fix checkpoint write size of zero to
+  return INVALID_PARAM error code.
+- Add upstream revision 1230 - Add missing include for assert.h.
+- Add upstream revision 1245 - Add cpgbench tool and better flow control system.
+- Move /sbin/ldconfig into regular package from devel package.
+
 * Tue Aug 15 2006 Steven Dake <sdake at redhat.com> - 0.80.1-1.0
 - New stable upstream release
 




More information about the fedora-cvs-commits mailing list