rpms/openais/devel revision-1223.patch, NONE, 1.1 revision-1230.patch, NONE, 1.1 revision-1245.patch, NONE, 1.1 openais.spec, 1.12, 1.13
fedora-cvs-commits at redhat.com
fedora-cvs-commits at redhat.com
Mon Sep 25 17:34:13 UTC 2006
- Previous message (by thread): rpms/xorg-x11-xinit/devel Xclients, 1.2, 1.3 xinitrc, 1.2, 1.3 xinitrc-common, 1.1, 1.2 xorg-x11-xinit.spec, 1.29, 1.30
- Next message (by thread): rpms/glibc/devel .cvsignore, 1.177, 1.178 glibc-fedora.patch, 1.185, 1.186 glibc.spec, 1.272, 1.273 sources, 1.201, 1.202
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
Author: sdake
Update of /cvs/dist/rpms/openais/devel
In directory cvs.devel.redhat.com:/tmp/cvs-serv25857/devel
Modified Files:
openais.spec
Added Files:
revision-1223.patch revision-1230.patch revision-1245.patch
Log Message:
auto-import openais-0.80.1-1.1 on branch devel from openais-0.80.1-1.1.src.rpm
revision-1223.patch:
ckpt.c | 2 ++
1 files changed, 2 insertions(+)
--- NEW FILE revision-1223.patch ---
Index: lib/ckpt.c
===================================================================
--- lib/ckpt.c (revision 1222)
+++ lib/ckpt.c (revision 1223)
@@ -1500,10 +1500,12 @@
for (i = 0; i < numberOfElements; i++) {
if (ioVector[i].dataSize == 0) {
*erroneousVectorIndex = i;
+ error = SA_AIS_ERR_INVALID_PARAM;
goto error_put;
}
if (ioVector[i].dataBuffer == NULL) {
*erroneousVectorIndex = i;
+ error = SA_AIS_ERR_INVALID_PARAM;
goto error_put;
}
}
revision-1230.patch:
amfsg.c | 1 +
1 files changed, 1 insertion(+)
--- NEW FILE revision-1230.patch ---
Index: exec/amfsg.c
===================================================================
--- exec/amfsg.c (revision 1229)
+++ exec/amfsg.c (revision 1230)
@@ -141,6 +141,7 @@
#include <stdlib.h>
#include <errno.h>
+#include <assert.h>
#include "amf.h"
#include "print.h"
revision-1245.patch:
exec/Makefile | 4
exec/amf.c | 1
exec/amfcomp.c | 3
exec/cfg.c | 1
exec/ckpt.c | 1
exec/clm.c | 1
exec/cpg.c | 41 ++++
exec/evs.c | 1
exec/evt.c | 1
exec/flow.c | 463 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
exec/flow.h | 76 ++++++++
exec/ipc.c | 189 +++++++++++++++++++++-
exec/ipc.h | 21 ++
exec/lck.c | 1
exec/main.c | 4
exec/main.h | 2
exec/msg.c | 1
exec/service.h | 1
include/cpg.h | 10 +
include/hdb.h | 2
include/ipc_cpg.h | 5
include/queue.h | 12 +
lib/cpg.c | 25 ++
test/Makefile | 5
test/cpgbench.c | 175 ++++++++++++++++++++
25 files changed, 1033 insertions(+), 13 deletions(-)
--- NEW FILE revision-1245.patch ---
Index: test/cpgbench.c
===================================================================
--- test/cpgbench.c (revision 0)
+++ test/cpgbench.c (revision 1245)
@@ -0,0 +1,175 @@
+#define _BSD_SOURCE
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake at mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "saAis.h"
+#include "cpg.h"
+
+int alarm_notice;
+
+void cpg_bm_confchg_fn (
+ cpg_handle_t handle,
+ struct cpg_name *group_name,
+ struct cpg_address *member_list, int member_list_entries,
+ struct cpg_address *left_list, int left_list_entries,
+ struct cpg_address *joined_list, int joined_list_entries)
+{
+}
+
+unsigned int write_count;
+
+void cpg_bm_deliver_fn (
+ cpg_handle_t handle,
+ struct cpg_name *group_name,
+ uint32_t nodeid,
+ uint32_t pid,
+ void *msg,
+ int msg_len)
+{
+ write_count++;
+}
+
+cpg_callbacks_t callbacks = {
+ .cpg_deliver_fn = cpg_bm_deliver_fn,
+ .cpg_confchg_fn = cpg_bm_confchg_fn
+};
+
+char data[500000];
+
+void cpg_benchmark (
+ cpg_handle_t handle,
+ int write_size)
+{
+ struct timeval tv1, tv2, tv_elapsed;
+ struct iovec iov;
+ unsigned int res;
+ cpg_flow_control_state_t flow_control_state;
+
+ alarm_notice = 0;
+ iov.iov_base = data;
+ iov.iov_len = write_size;
+
+ write_count = 0;
+ alarm (10);
+
+ gettimeofday (&tv1, NULL);
+ do {
+ /*
+ * Test checkpoint write
+ */
+ cpg_flow_control_state_get (handle, &flow_control_state);
+ if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+retry:
+ res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
+ if (res == CPG_ERR_TRY_AGAIN) {
+ goto retry;
+ }
+ }
+ res = cpg_dispatch (handle, CPG_DISPATCH_ALL);
+ if (res != CPG_OK) {
+ printf ("cpg dispatch returned error %d\n", res);
+ exit (1);
+ }
+ } while (alarm_notice == 0);
+ gettimeofday (&tv2, NULL);
+ timersub (&tv2, &tv1, &tv_elapsed);
+
+ printf ("%5d messages received ", write_count);
+ printf ("%5d bytes per write ", write_size);
+ printf ("%7.3f Seconds runtime ",
+ (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+ printf ("%9.3f TP/s ",
+ ((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+ printf ("%7.3f MB/s.\n",
+ ((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
+}
+
+void sigalrm_handler (int num)
+{
+ alarm_notice = 1;
+}
+
+static struct cpg_name group_name = {
+ .value = "cpg_bm",
+ .length = 6
+};
+
+int main (void) {
+ cpg_handle_t handle;
+ unsigned int size = 1;
+ int i;
+ unsigned int res;
+
+ signal (SIGALRM, sigalrm_handler);
+ res = cpg_initialize (&handle, &callbacks);
+ if (res != CPG_OK) {
+ printf ("cpg_initialize failed with result %d\n", res);
+ exit (1);
+ }
+
+ res = cpg_join (handle, &group_name);
+ if (res != CPG_OK) {
+ printf ("cpg_join failed with result %d\n", res);
+ exit (1);
+ }
+
+ for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
+ cpg_benchmark (handle, size);
+ size += 1000;
+ }
+
+ res = cpg_finalize (handle);
+ if (res != CPG_OK) {
+ printf ("cpg_join failed with result %d\n", res);
+ exit (1);
+ }
+ return (0);
+}
Index: test/Makefile
===================================================================
--- test/Makefile (revision 1244)
+++ test/Makefile (revision 1245)
@@ -50,7 +50,7 @@
testckpt ckptstress ckptbench \
ckptbenchth ckpt-rd ckpt-wr testevt testevs \
evsbench subscription publish evtbench unlink testclm2 testlck \
- testmsg testcpg openais-cfgtool
+ testmsg testcpg cpgbench openais-cfgtool
testtimer: testtimer.o $(LIBRARIES)
$(CC) $(LDFLAGS) -o testtimer testtimer.o ../exec/timer.o
@@ -139,6 +139,9 @@
testcpg: testcpg.o $(LIBRARIES)
$(CC) $(LDFLAGS) -o testcpg testcpg.o $(LIBS)
+cpgbench: cpgbench.o $(LIBRARIES)
+ $(CC) $(LDFLAGS) -o cpgbench cpgbench.o $(LIBS)
+
openais-cfgtool: openais-cfgtool.o $(LIBRARIES)
$(CC) $(LDFLAGS) -o openais-cfgtool openais-cfgtool.o $(LIBS)
Index: include/hdb.h
===================================================================
--- include/hdb.h (revision 1244)
+++ include/hdb.h (revision 1245)
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
+ * Copyright (c) 2006 Red Hat, Inc.
*
* All rights reserved.
*
@@ -196,7 +197,6 @@
handle_database,
handle_database->iterator,
instance);
-
handle_database->iterator += 1;
if (res == 0) {
Index: include/ipc_cpg.h
===================================================================
--- include/ipc_cpg.h (revision 1244)
+++ include/ipc_cpg.h (revision 1245)
@@ -56,7 +56,8 @@
MESSAGE_RES_CPG_CONFCHG_CALLBACK = 4,
MESSAGE_RES_CPG_DELIVER_CALLBACK = 5,
MESSAGE_RES_CPG_TRACKSTART = 6,
- MESSAGE_RES_CPG_TRACKSTOP = 7
+ MESSAGE_RES_CPG_TRACKSTOP = 7,
+ MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8
};
enum lib_cpg_confchg_reason {
@@ -111,6 +112,7 @@
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t nodeid __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
+ mar_uint32_t flow_control_state __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
@@ -140,5 +142,4 @@
mar_res_header_t header __attribute__((aligned(8)));
};
-
#endif /* IPC_CPG_H_DEFINED */
Index: include/cpg.h
===================================================================
--- include/cpg.h (revision 1244)
+++ include/cpg.h (revision 1245)
@@ -57,6 +57,11 @@
} cpg_guarantee_t;
typedef enum {
+ CPG_FLOW_CONTROL_DISABLED, /* flow control is disabled - new messages may be sent */
+ CPG_FLOW_CONTROL_ENABLED /* flow control is enabled - new messages should not be sent */
+} cpg_flow_control_state_t;
+
+typedef enum {
CPG_OK = 1,
CPG_ERR_LIBRARY = 2,
CPG_ERR_TIMEOUT = 5,
@@ -102,7 +107,6 @@
void *msg,
int msg_len);
-
typedef void (*cpg_confchg_fn_t) (
cpg_handle_t handle,
struct cpg_name *group_name,
@@ -183,4 +187,8 @@
struct cpg_address *member_list,
int *member_list_entries);
+cpg_error_t cpg_flow_control_state_get (
+ cpg_handle_t handle,
+ cpg_flow_control_state_t *flow_control_enabled);
+
#endif /* OPENAIS_CPG_H_DEFINED */
Index: include/queue.h
===================================================================
--- include/queue.h (revision 1244)
+++ include/queue.h (revision 1245)
@@ -97,7 +97,7 @@
int empty;
pthread_mutex_lock (&queue->mutex);
- empty = queue->used == 0;
+ empty = (queue->used == 0);
pthread_mutex_unlock (&queue->mutex);
return (empty);
}
@@ -213,4 +213,14 @@
return (used);
}
+static inline int queue_usedhw (struct queue *queue) {
+ int usedhw;
+
+ pthread_mutex_lock (&queue->mutex);
+ usedhw = queue->usedhw;
+ pthread_mutex_unlock (&queue->mutex);
+
+ return (usedhw);
+}
+
#endif /* QUEUE_H_DEFINED */
Index: exec/cfg.c
===================================================================
--- exec/cfg.c (revision 1244)
+++ exec/cfg.c (revision 1245)
@@ -166,6 +166,7 @@
.name = (unsigned char*)"openais configuration service",
.id = CFG_SERVICE,
.private_data_size = 0,
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = cfg_lib_init_fn,
.lib_exit_fn = cfg_lib_exit_fn,
.lib_service = cfg_lib_service,
Index: exec/Makefile
===================================================================
--- exec/Makefile (revision 1244)
+++ exec/Makefile (revision 1245)
@@ -58,9 +58,9 @@
LCR_OBJS = evs.o clm.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o aisparser.o vsf_ykd.o $(AMF_OBJS)
# main executive objects
-MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c timer.c \
+MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c flow.c timer.c \
totemconfig.c mainconfig.c
-MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o timer.o \
+MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o flow.o timer.o \
totemconfig.o mainconfig.o ../lcr/lcr_ifact.o
OTHER_OBJS = objdb.o
Index: exec/evs.c
===================================================================
--- exec/evs.c (revision 1244)
+++ exec/evs.c (revision 1245)
@@ -145,6 +145,7 @@
.name = (unsigned char*)"openais extended virtual synchrony service",
.id = EVS_SERVICE,
.private_data_size = sizeof (struct evs_pd),
+ .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED,
.lib_init_fn = evs_lib_init_fn,
.lib_exit_fn = evs_lib_exit_fn,
.lib_service = evs_lib_service,
Index: exec/flow.c
===================================================================
--- exec/flow.c (revision 0)
+++ exec/flow.c (revision 1245)
@@ -0,0 +1,463 @@
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake at mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * New messages are allowed from the library ONLY when the processor has not
+ * received a OPENAIS_FLOW_CONTROL_STATE_ENABLED from any processor. If a OPENAIS_FLOW_CONTROL_STATE_ENABLED
+ * message is sent, it must later be cancelled by a OPENAIS_FLOW_CONTROL_STATE_DISABLED
+ * message.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "flow.h"
+#include "totem.h"
+#include "totempg.h"
+#include "print.h"
+#include "hdb.h"
+#include "../include/list.h"
+
+#define OPENAIS_FLOW_CONTROL_ENABLED_SERVICES_MAX 128
+
+struct flow_control_instance {
+ struct list_head list_head;
+ unsigned int service;
+};
+
+DECLARE_LIST_INIT (flow_control_service_list_head);
+
+struct flow_control_message {
+ unsigned int service __attribute__((aligned(8)));
+ char id[1024] __attribute__((aligned(8)));
+ unsigned int id_len __attribute__((aligned(8)));
+ enum openais_flow_control_state flow_control_state __attribute__((aligned(8)));
+};
+
+struct flow_control_node_state {
+ unsigned int nodeid;
+ enum openais_flow_control_state flow_control_state;
+};
+
+struct flow_control_service {
+ struct flow_control_node_state flow_control_node_state[PROCESSOR_COUNT_MAX];
+ unsigned int service;
+ char id[1024];
+ unsigned int id_len;
+ void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state);
+ void *context;
+ unsigned int processor_count;
+ enum openais_flow_control_state flow_control_state;
+ struct list_head list;
+ struct list_head list_all;
+};
+
+static struct totempg_group flow_control_group = {
+ .group = "flowcontrol",
+ .group_len = 12
+};
+
+static totempg_groups_handle flow_control_handle;
+
+static struct hdb_handle_database flow_control_hdb = {
+ .handle_count = 0,
+ .handles = NULL,
+ .iterator = 0,
+ .mutex = PTHREAD_MUTEX_INITIALIZER
+};
+
+static unsigned int flow_control_member_list[PROCESSOR_COUNT_MAX];
+static unsigned int flow_control_member_list_entries;
+
+static inline int flow_control_xmit (
+ struct flow_control_service *flow_control_service,
+ enum openais_flow_control_state flow_control_state)
+{
+ struct flow_control_message flow_control_message;
+ struct iovec iovec;
+ unsigned int res;
+
+ flow_control_message.service = flow_control_service->service;
+ flow_control_message.flow_control_state = flow_control_state;
+ memcpy (&flow_control_message.id, flow_control_service->id,
+ flow_control_service->id_len);
+ flow_control_message.id_len = flow_control_service->id_len;
+
+ iovec.iov_base = (char *)&flow_control_message;
+ iovec.iov_len = sizeof (flow_control_message);
+
+ res = totempg_groups_mcast_joined (flow_control_handle, &iovec, 1,
+ TOTEMPG_AGREED);
+
+ return (res);
+}
+
+static void flow_control_deliver_fn (
+ unsigned int nodeid,
+ struct iovec *iovec,
+ int iov_len,
+ int endian_conversion_required)
+{
+ struct flow_control_message *flow_control_message = (struct flow_control_message *)iovec[0].iov_base;
+ struct flow_control_service *flow_control_service;
+ struct list_head *list;
+ unsigned int i;
+
+ for (list = flow_control_service_list_head.next;
+ list != &flow_control_service_list_head;
+ list = list->next) {
+
+ flow_control_service = list_entry (list, struct flow_control_service, list_all);
+ /*
+ * Find this nodeid in the flow control service and set the message
+ * enabled or disabled flag
+ */
+ for (i = 0; i < flow_control_service->processor_count; i++) {
+ if (nodeid == flow_control_service->flow_control_node_state[i].nodeid) {
+ flow_control_service->flow_control_node_state[i].flow_control_state =
+ flow_control_message->flow_control_state;
+ break;
+ }
+ }
+
+ /*
+ * Determine if any flow control is enabled on any nodes and set
+ * the internal variable appropriately
+ */
+ flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+ flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state);
+ for (i = 0; i < flow_control_service->processor_count; i++) {
+ if (flow_control_service->flow_control_node_state[i].flow_control_state == OPENAIS_FLOW_CONTROL_STATE_ENABLED) {
+ flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+ flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state);
+ }
+ }
+ } /* for list iteration */
+}
+
+static void flow_control_confchg_fn (
+ enum totem_configuration_type configuration_type,
+ unsigned int *member_list, int member_list_entries,
+ unsigned int *left_list, int left_list_entries,
+ unsigned int *joined_list, int joined_list_entries,
+ struct memb_ring_id *ring_id)
+{
+ unsigned int i;
+ struct flow_control_service *flow_control_service;
+ struct list_head *list;
+
+ memcpy (flow_control_member_list, member_list,
+ sizeof (unsigned int) * member_list_entries);
+ flow_control_member_list_entries = member_list_entries;
+
+ for (list = flow_control_service_list_head.next;
+ list != &flow_control_service_list_head;
+ list = list->next) {
+
+ flow_control_service = list_entry (list, struct flow_control_service, list_all);
+
+ /*
+ * Set all of the node ids after a configuration change
+ * Turn off all flow control after a configuration change
+ */
+ flow_control_service->processor_count = flow_control_member_list_entries;
+ flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+ for (i = 0; i < member_list_entries; i++) {
+ flow_control_service->flow_control_node_state[i].nodeid = member_list[i];
+ flow_control_service->flow_control_node_state[i].flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+ }
+ }
+}
+/*
+ * External API
+ */
+unsigned int openais_flow_control_initialize (void)
+{
+ unsigned int res;
+
+ log_init ("FLOW");
+
+ res = totempg_groups_initialize (
+ &flow_control_handle,
+ flow_control_deliver_fn,
+ flow_control_confchg_fn);
+
+ if (res == -1) {
+ log_printf (LOG_LEVEL_ERROR,
+ "Couldn't initialize flow control interface.\n");
+ return (-1);
+ }
+ res = totempg_groups_join (
+ flow_control_handle,
+ &flow_control_group,
+ 1);
+
+ if (res == -1) {
+ log_printf (LOG_LEVEL_ERROR, "Couldn't join flow control group.\n");
+ return (-1);
+ }
+
+ return (0);
+}
+
+unsigned int openais_flow_control_ipc_init (
+ unsigned int *flow_control_handle,
+ unsigned int service)
+{
+ struct flow_control_instance *instance;
+ unsigned int res;
+
+ res = hdb_handle_create (&flow_control_hdb,
+ sizeof (struct flow_control_instance), flow_control_handle);
+ if (res != 0) {
+ goto error_exit;
+ }
+ res = hdb_handle_get (&flow_control_hdb, *flow_control_handle,
+ (void *)&instance);
+ if (res != 0) {
+ goto error_destroy;
+ }
+ instance->service = service;
+
+ list_init (&instance->list_head);
+
+ return (0);
+
+error_destroy:
+ hdb_handle_destroy (&flow_control_hdb, *flow_control_handle);
+error_exit:
+ return (-1);
+
+}
+
+unsigned int openais_flow_control_ipc_exit (
+ unsigned int flow_control_handle)
+{
+ hdb_handle_destroy (&flow_control_hdb, flow_control_handle);
+ return (0);
+}
+
+unsigned int openais_flow_control_create (
+ unsigned int flow_control_handle,
+ unsigned int service,
+ void *id,
+ unsigned int id_len,
+ void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state),
+ void *context)
+{
+ struct flow_control_service *flow_control_service;
+ struct flow_control_instance *instance;
+ unsigned int res;
+ unsigned int i;
+
+ res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+ (void *)&instance);
+ if (res != 0) {
+ goto error_exit;
+ }
+
+ flow_control_service = malloc (sizeof (struct flow_control_service));
+ if (flow_control_service == NULL) {
+ goto error_put;
+ }
+
+ /*
+ * Add new service to flow control system
+ */
+ memset (flow_control_service, 0, sizeof (struct flow_control_service));
+
+ flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+ flow_control_service->service = service;
+ memcpy (flow_control_service->id, id, id_len);
+ flow_control_service->id_len = id_len;
+ flow_control_service->flow_control_state_set_fn = flow_control_state_set_fn;
+ flow_control_service->context = context;
+
+ list_init (&flow_control_service->list);
+ list_add_tail (&instance->list_head,
+ &flow_control_service->list);
+
+ list_init (&flow_control_service->list_all);
+ list_add_tail (&flow_control_service_list_head,
+ &flow_control_service->list_all);
+
+ for (i = 0; i < flow_control_member_list_entries; i++) {
+ flow_control_service->flow_control_node_state[i].nodeid = flow_control_member_list[i];
+ flow_control_service->processor_count = flow_control_member_list_entries;
+ }
+error_put:
+ hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+ return (res);
+}
+
+unsigned int openais_flow_control_destroy (
+ unsigned int flow_control_identifier,
+ unsigned int service,
+ unsigned char *id,
+ unsigned int id_len)
+{
+ struct flow_control_service *flow_control_service;
+ struct flow_control_instance *instance;
+ struct list_head *list;
+ unsigned int res;
+
+ res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+ (void *)&instance);
+ if (res != 0) {
+ goto error_exit;
+ }
+
+ for (list = flow_control_service_list_head.next;
+ list != &flow_control_service_list_head;
+ list = list->next) {
+
+ flow_control_service = list_entry (list, struct flow_control_service, list_all);
+
+ if ((flow_control_service->id_len == id_len) &&
+ (memcmp (flow_control_service->id, id, id_len) == 0)) {
+ list_del (&flow_control_service->list);
+ list_del (&flow_control_service->list_all);
+ free (flow_control_service);
+ break; /* done */
+ }
+ }
+ hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+ return (res);
+}
+/*
+ * If 1 is returned, flow control is enabled for this service and no
+ * new messages are allowed to be sent from the library for this connection
+ */
+unsigned int openais_flow_control_enabled (
+ unsigned int flow_control_handle)
+{
+ struct flow_control_instance *instance;
+ struct flow_control_service *flow_control_service;
+ struct list_head *list;
+ unsigned int res = 0;
+
+ res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+ (void *)&instance);
+ if (res != 0) {
+ goto error_exit;
+ }
+
+ for (list = instance->list_head.next;
+ list != &instance->list_head;
+ list = list->next) {
+
+ flow_control_service = list_entry (list, struct flow_control_service, list);
+ if (flow_control_service->flow_control_state == OPENAIS_FLOW_CONTROL_STATE_ENABLED) {
+ res = 1;
+ break;
+ }
+ }
+ hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+ return (res);
+}
+
+/*
+ * Disable the ability for new messages to be sent for this service
+ * with the handle id of length id_len
+ */
+unsigned int openais_flow_control_disable (
+ unsigned int flow_control_handle)
+{
+ struct flow_control_instance *instance;
+ struct flow_control_service *flow_control_service;
+ struct list_head *list;
+ unsigned int res;
+ unsigned int i;
+
+ res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+ (void *)&instance);
+ if (res != 0) {
+ goto error_exit;
+ }
+
+i = 0;
+ for (list = instance->list_head.next;
+ list != &instance->list_head;
+ list = list->next) {
+
+ flow_control_service = list_entry (list, struct flow_control_service, list);
+ flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+ flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_DISABLED);
+ }
+ hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+ return (res);
+}
+
+/*
+ * Enable the ability for new messagess to be sent for this service
+ * with the handle id of length id_len
+ */
+unsigned int openais_flow_control_enable (
+ unsigned int flow_control_handle)
+{
+ struct flow_control_instance *instance;
+ struct flow_control_service *flow_control_service;
+ struct list_head *list;
+ unsigned int res;
+
+ res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+ (void *)&instance);
+ if (res != 0) {
+ goto error_exit;
+ }
+
+ for (list = instance->list_head.next;
+ list != &instance->list_head;
+ list = list->next) {
+
+
+ flow_control_service = list_entry (list, struct flow_control_service, list);
+ flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+ flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_ENABLED);
+ }
+ hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+ return (res);
+}
Property changes on: exec/flow.c
___________________________________________________________________
Name: svn:executable
+ *
Index: exec/flow.h
===================================================================
--- exec/flow.h (revision 0)
+++ exec/flow.h (revision 1245)
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake at mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef FLOW_H_DEFINED
+#define FLOW_H_DEFINED
+
+enum openais_flow_control_state {
+ OPENAIS_FLOW_CONTROL_STATE_DISABLED,
+ OPENAIS_FLOW_CONTROL_STATE_ENABLED
+};
+
+unsigned int openais_flow_control_initialize (void);
+
+unsigned int openais_flow_control_ipc_init (
+ unsigned int *flow_control_identifier,
+ unsigned int service);
+
+unsigned int openais_flow_control_ipc_exit (
+ unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_create (
+ unsigned int flow_control_handle,
+ unsigned int service,
+ void *id,
+ unsigned int id_len,
+ void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state),
+ void *context);
+
+unsigned int openais_flow_control_destroy (
+ unsigned int flow_control_identifier,
+ unsigned int service,
+ unsigned char *id,
+ unsigned int id_len);
+
+unsigned int openais_flow_control_enabled (
+ unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_disable (
+ unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_enable (
+ unsigned int flow_control_identifier);
+
+#endif /* FLOW_H_DEFINED */
Index: exec/service.h
===================================================================
--- exec/service.h (revision 1244)
+++ exec/service.h (revision 1245)
@@ -65,6 +65,7 @@
unsigned char *name;
unsigned short id;
unsigned int private_data_size;
+ enum openais_flow_control flow_control;
int (*lib_init_fn) (void *conn);
int (*lib_exit_fn) (void *conn);
struct openais_lib_handler *lib_service;
Index: exec/cpg.c
===================================================================
--- exec/cpg.c (revision 1244)
+++ exec/cpg.c (revision 1245)
@@ -66,6 +66,7 @@
#include "jhash.h"
#include "swab.h"
#include "ipc.h"
+#include "flow.h"
#include "print.h"
#define GROUP_HASH_SIZE 32
@@ -102,6 +103,7 @@
void *conn;
void *trackerconn;
struct group_info *group;
+ enum openais_flow_control_state flow_control_state;
struct list_head list; /* on the group_info members list */
};
@@ -237,6 +239,7 @@
.name = (unsigned char*)"openais cluster closed process group service v1.01",
.id = CPG_SERVICE,
.private_data_size = sizeof (struct process_info),
+ .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED,
.lib_init_fn = cpg_lib_init_fn,
.lib_exit_fn = cpg_lib_exit_fn,
.lib_service = cpg_lib_service,
@@ -304,6 +307,8 @@
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
+ mar_uint32_t flow_control_state __attribute__((aligned(8)));
+ mar_message_source_t source __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
@@ -608,6 +613,15 @@
}
}
+static void cpg_flow_control_state_set_fn (
+ void *context,
+ enum openais_flow_control_state flow_control_state)
+{
+ struct process_info *process_info = (struct process_info *)context;
+
+ process_info->flow_control_state = flow_control_state;
+}
+
/* Can byteswap join & leave messages */
static void exec_cpg_procjoin_endian_convert (void *msg)
{
@@ -640,7 +654,8 @@
swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
-
+ req_exec_cpg_mcast->flow_control_state = swab32(req_exec_cpg_mcast->flow_control_state);
+ swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
static void do_proc_join(
@@ -787,6 +802,12 @@
struct group_info *gi;
struct list_head *iter;
+ /*
+ * Track local messages so that flow is controlled on the local node
+ */
+ if (message_source_is_local (&req_exec_cpg_mcast->source)) {
+ openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn);
+ }
gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */
assert(gi);
@@ -796,6 +817,7 @@
res_lib_cpg_mcast->msglen = msglen;
res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid;
res_lib_cpg_mcast->nodeid = nodeid;
+ res_lib_cpg_mcast->flow_control_state = 0;
memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
sizeof(mar_cpg_name_t));
memcpy(&res_lib_cpg_mcast->message, (char*)message+sizeof(*req_exec_cpg_mcast),
@@ -911,6 +933,14 @@
goto join_err;
}
+ openais_ipc_flow_control_create (
+ conn,
+ CPG_SERVICE,
+ req_lib_cpg_join->group_name.value,
+ req_lib_cpg_join->group_name.length,
+ cpg_flow_control_state_set_fn,
+ pi);
+
/* Add a node entry for us */
pi->nodeid = this_ip->nodeid;
pi->pid = req_lib_cpg_join->pid;
@@ -948,6 +978,12 @@
cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
pi->group = NULL;
+ openais_ipc_flow_control_destroy (
+ conn,
+ CPG_SERVICE,
+ gi->group_name.value,
+ gi->group_name.length);
+
leave_ret:
/* send return */
res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
@@ -984,6 +1020,8 @@
MESSAGE_REQ_EXEC_CPG_MCAST);
req_exec_cpg_mcast.pid = pi->pid;
req_exec_cpg_mcast.msglen = msglen;
+ req_exec_cpg_mcast.flow_control_state = pi->flow_control_state;
+ message_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name,
sizeof(mar_cpg_name_t));
@@ -994,6 +1032,7 @@
// TODO: guarantee type...
result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, TOTEMPG_AGREED);
+ openais_ipc_flow_control_local_increment (conn);
res.size = sizeof(res);
res.id = MESSAGE_RES_CPG_MCAST;
Index: exec/clm.c
===================================================================
--- exec/clm.c (revision 1244)
+++ exec/clm.c (revision 1245)
@@ -202,6 +202,7 @@
.name = (unsigned char*)"openais cluster membership service B.01.01",
.id = CLM_SERVICE,
.private_data_size = sizeof (struct clm_pd),
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = clm_lib_init_fn,
.lib_exit_fn = clm_lib_exit_fn,
.lib_service = clm_lib_service,
Index: exec/ipc.c
===================================================================
--- exec/ipc.c (revision 1244)
+++ exec/ipc.c (revision 1245)
@@ -67,6 +67,7 @@
#include "totemconfig.h"
#include "main.h"
#include "ipc.h"
+#include "flow.h"
#include "service.h"
#include "sync.h"
#include "swab.h"
@@ -80,10 +81,25 @@
#define SERVER_BACKLOG 5
+/*
+ * When there are this many entries left in a queue, turn on flow control
+ */
+#define FLOW_CONTROL_ENTRIES_ENABLE 400
+
+/*
+ * When there are this many entries in a queue, turn off flow control
+ */
+#define FLOW_CONTROL_ENTRIES_DISABLE 64
+
+
static unsigned int g_gid_valid = 0;
static struct totem_ip_address *my_ip;
+static totempg_groups_handle ipc_handle;
+
+DECLARE_LIST_INIT (conn_info_list_head);
+
static void (*ipc_serialize_lock_fn) (void);
static void (*ipc_serialize_unlock_fn) (void);
@@ -117,16 +133,22 @@
int authenticated; /* Is this connection authenticated? */
void *private_data; /* library connection private data */
struct conn_info *conn_info_partner; /* partner connection dispatch<->response */
+ unsigned int flow_control_handle; /* flow control identifier */
+ unsigned int flow_control_enabled; /* flow control enabled bit */
+ unsigned int flow_control_local_count; /* flow control local count */
+ enum openais_flow_control flow_control; /* Does this service use IPC flow control */
+ pthread_mutex_t flow_control_mutex;
int (*lib_exit_fn) (void *conn);
struct timerlist timerlist;
pthread_mutex_t mutex;
pthread_mutex_t *shared_mutex;
-
+ struct list_head list;
};
static void *prioritized_poll_thread (void *conn);
static int conn_info_outq_flush (struct conn_info *conn_info);
static void libais_deliver (struct conn_info *conn_info);
+static void ipc_flow_control (struct conn_info *conn_info);
/*
* IPC Initializers
@@ -245,6 +267,15 @@
conn_info->conn_info_partner->state = CONN_STATE_ACTIVE;
conn_info->lib_exit_fn = ais_service[conn_info->service]->lib_exit_fn;
ais_service[conn_info->service]->lib_init_fn (conn_info);
+
+ conn_info->flow_control = ais_service[conn_info->service]->flow_control;
+ conn_info->conn_info_partner->flow_control = ais_service[conn_info->service]->flow_control;
+ if (ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
+ openais_flow_control_ipc_init (
+ &conn_info->flow_control_handle,
+ conn_info->service);
+
+ }
return (0);
}
@@ -283,6 +314,7 @@
}
pthread_mutex_init (&conn_info->mutex, NULL);
+ pthread_mutex_init (&conn_info->flow_control_mutex, NULL);
pthread_mutex_init (conn_info->shared_mutex, NULL);
conn_info->state = CONN_STATE_ACTIVE;
@@ -290,6 +322,9 @@
conn_info->events = POLLIN|POLLNVAL;
conn_info->service = SOCKET_SERVICE_INIT;
+ list_init (&conn_info->list);
+ list_add (&conn_info_list_head, &conn_info->list);
+
pthread_attr_init (&conn_info->thread_attr);
pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED);
@@ -316,6 +351,7 @@
if (conn_info->conn_info_partner) {
conn_info->conn_info_partner->conn_info_partner = NULL;
}
+ list_del (&conn_info->list);
free (conn_info);
}
@@ -372,6 +408,9 @@
}
conn_info->state = CONN_STATE_DISCONNECTED;
conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTED;
+ if (conn_info->flow_control_enabled == 1) {
+ openais_flow_control_disable (conn_info->flow_control_handle);
+ }
return (0);
}
@@ -487,6 +526,9 @@
if ((ufd.revents & POLLIN) == POLLIN) {
libais_deliver (conn_info);
}
+
+ ipc_flow_control (conn_info);
+
}
ipc_serialize_unlock_fn ();
@@ -512,6 +554,42 @@
#endif
+static void ipc_flow_control (struct conn_info *conn_info)
+{
+ unsigned int entries_used;
+ unsigned int entries_usedhw;
+
+ entries_used = queue_used (&conn_info->outq);
+ if (conn_info->flow_control_local_count > entries_used) {
+ entries_used = conn_info->flow_control_local_count;
+ }
+ /*
+ * IPC group-wide flow control
+ */
+ if (conn_info->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
+ if (conn_info->flow_control_enabled == 0 &&
+ ((entries_used + FLOW_CONTROL_ENTRIES_ENABLE) > SIZEQUEUE)) {
+
+ entries_usedhw = queue_usedhw (&conn_info->outq);
+ log_printf (LOG_LEVEL_NOTICE, "Enabling flow control - HW mark %d of %d %p.\n", entries_usedhw, SIZEQUEUE, &conn_info->outq);
+ openais_flow_control_enable (conn_info->flow_control_handle);
+ conn_info->flow_control_enabled = 1;
+ conn_info->conn_info_partner->flow_control_enabled = 1;
+ }
+ if (conn_info->flow_control_enabled == 1 &&
+
+ entries_used <= FLOW_CONTROL_ENTRIES_DISABLE) {
+ entries_usedhw = queue_usedhw (&conn_info->outq);
+
+ log_printf (LOG_LEVEL_NOTICE, "Disabling flow control - HW mark [%d/%d].\n",
+ entries_usedhw, SIZEQUEUE);
+ openais_flow_control_disable (conn_info->flow_control_handle);
+ conn_info->flow_control_enabled = 0;
+ conn_info->conn_info_partner->flow_control_enabled = 0;
+ }
+ }
+}
+
static int conn_info_outq_flush (struct conn_info *conn_info) {
struct queue *outq;
int res = 0;
@@ -574,6 +652,7 @@
if (queue_is_empty (outq)) {
conn_info->events = POLLIN|POLLNVAL;
}
+
return (0);
}
@@ -632,7 +711,9 @@
iov_recv.iov_base = &conn_info->inb[conn_info->inb_start];
iov_recv.iov_len = (SIZEINB) - conn_info->inb_start;
- assert (iov_recv.iov_len != 0);
+ if (conn_info->inb_inuse == SIZEINB) {
+ return;
+ }
retry_recv:
res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
@@ -714,6 +795,16 @@
(send_ok_joined) &&
(sync_in_process() == 0)));
+ /*
+ * Check if flow control on new messages is enabled
+ * for this service
+ */
+ if ((send_ok == 1) &&
+ ((ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED &&
+ openais_flow_control_enabled (conn_info->flow_control_handle) == 1)) || (conn_info->flow_control_enabled == 1) || (conn_info->conn_info_partner->flow_control_enabled == 1)) {
+ send_ok = 0;
+ }
+
if (send_ok) {
ais_service[service]->lib_service[header->id].lib_handler_fn(conn_info, header);
} else {
@@ -831,6 +922,29 @@
source->conn = conn;
}
+static void ipc_confchg_fn (
+ enum totem_configuration_type configuration_type,
+ unsigned int *member_list, int member_list_entries,
+ unsigned int *left_list, int left_list_entries,
+ unsigned int *joined_list, int joined_list_entries,
+ struct memb_ring_id *ring_id)
+{
+ struct conn_info *conn_info;
+ struct list_head *list;
+
+ /*
+ * Turn on flow control enabled flag for all connections
+ */
+ for (list = conn_info_list_head.next;
+ list != &conn_info_list_head;
+ list = list->next) {
+
+ conn_info = list_entry (list, struct conn_info, list);
+ conn_info->flow_control_enabled = 1;
+ conn_info->conn_info_partner->flow_control_enabled = 1;
+ }
+}
+
void openais_ipc_init (
void (*serialize_lock_fn) (void),
void (*serialize_unlock_fn) (void),
@@ -893,6 +1007,15 @@
g_gid_valid = gid_valid;
my_ip = my_ip_in;
+
+ /*
+ * Reset internal state of flow control when
+ * configuration change occurs
+ */
+ res = totempg_groups_initialize (
+ &ipc_handle,
+ NULL,
+ ipc_confchg_fn);
}
@@ -947,6 +1070,9 @@
if (!libais_connection_active (conn_info)) {
return (-1);
}
+
+ ipc_flow_control (conn_info);
+
outq = &conn_info->outq;
msg_send.msg_iov = &iov_send;
@@ -1095,3 +1221,62 @@
timerlist_del (&conn_info->timerlist, timer_handle);
}
+
+void openais_ipc_flow_control_create (
+ void *conn,
+ unsigned int service,
+ char *id,
+ int id_len,
+ void (*flow_control_state_set_fn) (void *conn, enum openais_flow_control_state),
+ void *context)
+{
+ struct conn_info *conn_info = (struct conn_info *)conn;
+
+ openais_flow_control_create (
+ conn_info->flow_control_handle,
+ service,
+ id,
+ id_len,
+ flow_control_state_set_fn,
+ context);
+ conn_info->conn_info_partner->flow_control_handle = conn_info->flow_control_handle;
+}
+
+void openais_ipc_flow_control_destroy (
+ void *conn,
+ unsigned int service,
+ char *id,
+ int id_len)
+{
+ struct conn_info *conn_info = (struct conn_info *)conn;
+
+ openais_flow_control_destroy (
+ conn_info->flow_control_handle,
+ service,
+ id,
+ id_len);
+}
+
+void openais_ipc_flow_control_local_increment (
+ void *conn)
+{
+ struct conn_info *conn_info = (struct conn_info *)conn;
+
+ pthread_mutex_lock (&conn_info->flow_control_mutex);
+
+ conn_info->flow_control_local_count++;
+
+ pthread_mutex_unlock (&conn_info->flow_control_mutex);
+}
+
+void openais_ipc_flow_control_local_decrement (
+ void *conn)
+{
+ struct conn_info *conn_info = (struct conn_info *)conn;
+
+ pthread_mutex_lock (&conn_info->flow_control_mutex);
+
+ conn_info->flow_control_local_count--;
+
+ pthread_mutex_unlock (&conn_info->flow_control_mutex);
+}
Index: exec/ipc.h
===================================================================
--- exec/ipc.h (revision 1244)
+++ exec/ipc.h (revision 1245)
@@ -36,6 +36,7 @@
#define IPC_H_DEFINED
#include "tlist.h"
+#include "flow.h"
extern void message_source_set (mar_message_source_t *source, void *conn);
@@ -68,4 +69,24 @@
void *conn,
timer_handle timer_handle);
+extern void openais_ipc_flow_control_create (
+ void *conn,
+ unsigned int service,
+ char *id,
+ int id_len,
+ void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state_set),
+ void *context);
+
+extern void openais_ipc_flow_control_destroy (
+ void *conn,
+ unsigned int service,
+ char *id,
+ int id_len);
+
+extern void openais_ipc_flow_control_local_increment (
+ void *conn);
+
+extern void openais_ipc_flow_control_local_decrement (
+ void *conn);
+
#endif /* IPC_H_DEFINED */
Index: exec/amfcomp.c
===================================================================
--- exec/amfcomp.c (revision 1244)
+++ exec/amfcomp.c (revision 1245)
@@ -1411,6 +1411,7 @@
assert (0);
break;
}
+ return 0;
}
/**
@@ -1687,4 +1688,6 @@
}
assert (0);
+ /* XXX we fall here in case NDEBUG is set */
+ return -1;
}
Index: exec/evt.c
===================================================================
--- exec/evt.c (revision 1244)
+++ exec/evt.c (revision 1245)
@@ -215,6 +215,7 @@
(unsigned char*)"openais event service B.01.01",
.id = EVT_SERVICE,
.private_data_size = sizeof (struct libevt_pd),
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = evt_lib_init,
.lib_exit_fn = evt_lib_exit,
.lib_service = evt_lib_service,
Index: exec/ckpt.c
===================================================================
--- exec/ckpt.c (revision 1244)
+++ exec/ckpt.c (revision 1245)
@@ -545,6 +545,7 @@
.name = (unsigned char *)"openais checkpoint service B.01.01",
.id = CKPT_SERVICE,
.private_data_size = sizeof (struct ckpt_pd),
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = ckpt_lib_init_fn,
.lib_exit_fn = ckpt_lib_exit_fn,
.lib_service = ckpt_lib_service,
Index: exec/amf.c
===================================================================
--- exec/amf.c (revision 1244)
+++ exec/amf.c (revision 1245)
@@ -266,6 +266,7 @@
.name = (unsigned char *)"openais availability management framework B.01.01",
.id = AMF_SERVICE,
.private_data_size = sizeof (struct amf_pd),
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = amf_lib_init_fn,
.lib_exit_fn = amf_lib_exit_fn,
.lib_service = amf_lib_service,
Index: exec/lck.c
===================================================================
--- exec/lck.c (revision 1244)
+++ exec/lck.c (revision 1245)
@@ -283,6 +283,7 @@
.name = (unsigned char*)"openais distributed locking service B.01.01",
.id = LCK_SERVICE,
.private_data_size = sizeof (struct lck_pd),
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = lck_lib_init_fn,
.lib_exit_fn = lck_lib_exit_fn,
.lib_service = lck_lib_service,
Index: exec/main.c
===================================================================
--- exec/main.c (revision 1244)
+++ exec/main.c (revision 1245)
@@ -75,6 +75,7 @@
#include "timer.h"
#include "print.h"
#include "util.h"
+#include "flow.h"
#include "version.h"
#define SERVER_BACKLOG 5
@@ -538,6 +539,9 @@
sync_register (openais_sync_callbacks_retrieve, openais_sync_completed,
totem_config.vsf_type);
+
+ res = openais_flow_control_initialize ();
+
/*
* Drop root privleges to user 'ais'
* TODO: Don't really need full root capabilities;
Index: exec/msg.c
===================================================================
--- exec/msg.c (revision 1244)
+++ exec/msg.c (revision 1245)
@@ -436,6 +436,7 @@
.name = (unsigned char *)"openais message service B.01.01",
.id = MSG_SERVICE,
.private_data_size = sizeof (struct msg_pd),
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = msg_lib_init_fn,
.lib_exit_fn = msg_lib_exit_fn,
.lib_service = msg_lib_service,
Index: exec/main.h
===================================================================
--- exec/main.h (revision 1244)
+++ exec/main.h (revision 1245)
@@ -46,7 +46,7 @@
* Size of the queue (entries) for I/O's to the API over socket IPC.
*/
-#define SIZEQUEUE 256
+#define SIZEQUEUE 800
#define SOCKET_SERVICE_INIT 254
Index: lib/cpg.c
===================================================================
--- lib/cpg.c (revision 1244)
+++ lib/cpg.c (revision 1245)
@@ -55,6 +55,7 @@
int response_fd;
int dispatch_fd;
int finalize;
+ cpg_flow_control_state_t flow_control_state;
cpg_callbacks_t callbacks;
pthread_mutex_t response_mutex;
pthread_mutex_t dispatch_mutex;
@@ -306,6 +307,7 @@
case MESSAGE_RES_CPG_DELIVER_CALLBACK:
res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
+ cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state;
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_deliver_callback->group_name);
@@ -352,7 +354,6 @@
res_cpg_confchg_callback->joined_list_entries);
break;
-
default:
error = SA_AIS_ERR_LIBRARY;
goto error_nounlock;
@@ -536,6 +537,10 @@
goto error_exit;
}
+ cpg_inst->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
+ if (res_lib_cpg_mcast.error == CPG_ERR_TRY_AGAIN) {
+ cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
+ }
error = res_lib_cpg_mcast.error;
error_exit:
@@ -600,4 +605,22 @@
return (error);
}
+cpg_error_t cpg_flow_control_state_get (
+ cpg_handle_t handle,
+ cpg_flow_control_state_t *flow_control_state)
+{
+ cpg_error_t error;
+ struct cpg_inst *cpg_inst;
+
+ error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ *flow_control_state = cpg_inst->flow_control_state;
+
+ saHandleInstancePut (&cpg_handle_t_db, handle);
+
+ return (error);
+}
/** @} */
Index: openais.spec
===================================================================
RCS file: /cvs/dist/rpms/openais/devel/openais.spec,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- openais.spec 16 Aug 2006 04:45:44 -0000 1.12
+++ openais.spec 25 Sep 2006 17:34:06 -0000 1.13
@@ -1,12 +1,15 @@
Name: openais
Summary: The openais Standards-Based Cluster Framework executive and APIs
Version: 0.80.1
-Release: 1.0
+Release: 1.1
License: BSD
Group: System Environment/Base
URL: http://developer.osdl.org/dev/openais/
Source0: http://developer.osdl.org/dev/openais/downloads/openais-%{version}/openais-%{version}.tar.gz
Patch0: openais-0.76-defaultconfig.patch
+Patch1: revision-1223.patch
+Patch2: revision-1230.patch
+Patch3: revision-1245.patch
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
ExclusiveArch: i386 ppc x86_64 ppc64 ia64 s390 s390x
Requires(pre): /usr/sbin/useradd
@@ -30,10 +33,16 @@
%prep
%setup -q -n openais-%{version}
+%patch0
+%patch1
+%patch2
+%patch3
%build
# -O3 required for performance reasons
-CFLAGS="$(echo '%{optflags}' | sed -e 's/-O[0-9]*//') -O3"
+# So we get proper debug output, for now we don't compile with O3
+#CFLAGS="$(echo '%{optflags}' | sed -e 's/-O[0-9]*//') -O3"
+CFLAGS="$(echo '%{optflags}')"
make CFLAGS="$CFLAGS"
%install
@@ -56,6 +65,7 @@
%post
/sbin/chkconfig --add openais || :
+/sbin/ldconfig > /dev/null
%preun
if [ $1 -eq 0 ]; then
@@ -65,10 +75,7 @@
%postun
[ "$1" -ge "1" ] && %{_initrddir}/openais condrestart &>/dev/null || :
-
-%post devel -p /sbin/ldconfig
-
-%postun devel -p /sbin/ldconfig
+/sbin/ldconfig > /dev/null
%files
%defattr(-,root,root,-)
@@ -163,6 +170,13 @@
%{_mandir}/man3/evs_membership_get.3*
%changelog
+* Mon Sep 25 2006 Steven Dake <sdake at redhat.com> - 0.80.1-1.1
+- Add upstream revision 1223 - Fix checkpoint write size of zero to
+ return INVALID_PARAM error code.
+- Add upstream revision 1230 - Add missing include for assert.h.
+- Add upstream revision 1245 - Add cpgbench tool and better flow control system.
+- Move /sbin/ldconfig into regular package from devel package.
+
* Tue Aug 15 2006 Steven Dake <sdake at redhat.com> - 0.80.1-1.0
- New stable upstream release
- Previous message (by thread): rpms/xorg-x11-xinit/devel Xclients, 1.2, 1.3 xinitrc, 1.2, 1.3 xinitrc-common, 1.1, 1.2 xorg-x11-xinit.spec, 1.29, 1.30
- Next message (by thread): rpms/glibc/devel .cvsignore, 1.177, 1.178 glibc-fedora.patch, 1.185, 1.186 glibc.spec, 1.272, 1.273 sources, 1.201, 1.202
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
More information about the fedora-cvs-commits
mailing list