[Date Prev][Date Next] [Thread Prev][Thread Next]
[Thread Index]
[Date Index]
[Author Index]
[libvirt] Re: [PATCH 07/12] Domain Events - remote driver
- From: Ben Guthro <bguthro virtualiron com>
- To: libvir-list redhat com
- Subject: [libvirt] Re: [PATCH 07/12] Domain Events - remote driver
- Date: Tue, 21 Oct 2008 15:16:20 -0400
[PATCH 07/12] Domain Events - remote driver
Deliver local callbacks in response to remote events
remote_internal.c | 276 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 271 insertions(+), 5 deletions(-)
diff --git a/src/remote_internal.c b/src/remote_internal.c
index 35b7b4b..8875674 100644
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -34,6 +34,7 @@
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/poll.h>
#include <fcntl.h>
#ifdef HAVE_SYS_WAIT_H
@@ -73,6 +74,7 @@
#include "remote_protocol.h"
#include "memory.h"
#include "util.h"
+#include "event.h"
/* Per-connection private data. */
#define MAGIC 999 /* private_data->magic if OK */
@@ -97,6 +99,13 @@ struct private_data {
unsigned int saslDecodedLength;
unsigned int saslDecodedOffset;
#endif
+ /* The list of domain event callbacks */
+ virDomainEventCallbackListPtr callbackList;
+ /* The queue of domain events generated
+ during a call / response rpc */
+ virDomainEventQueuePtr domainEvents;
+ /* Timer for flushing domainEvents queue */
+ int eventFlushTimer;
};
#define GET_PRIVATE(conn,retcode) \
@@ -156,7 +165,10 @@ static void make_nonnull_domain (remote_nonnull_domain *dom_dst, virDomainPtr do
static void make_nonnull_network (remote_nonnull_network *net_dst, virNetworkPtr net_src);
static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
-
+void remoteDomainEventFired(int fd, virEventHandleType event, void *data);
+static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
+static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
+void remoteDomainEventQueueFlush(int timer, void *opaque);
/*----------------------------------------------------------------------*/
/* Helper functions for remoteOpen. */
@@ -680,6 +692,36 @@ doRemoteOpen (virConnectPtr conn,
(xdrproc_t) xdr_void, (char *) NULL) == -1)
goto failed;
+ if(VIR_ALLOC(priv->callbackList)<0) {
+ error(conn, VIR_ERR_INVALID_ARG, _("Error allocating callbacks list"));
+ goto failed;
+ }
+
+ if(VIR_ALLOC(priv->domainEvents)<0) {
+ error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents"));
+ goto failed;
+ }
+
+ DEBUG0("Adding Handler for remote events");
+ /* Set up a callback to listen on the socket data */
+ if (virEventAddHandle(priv->sock,
+ VIR_EVENT_HANDLE_READABLE |
+ VIR_EVENT_HANDLE_ERROR |
+ VIR_EVENT_HANDLE_HANGUP,
+ remoteDomainEventFired,
+ conn) < 0) {
+ DEBUG0("virEventAddHandle failed: No addHandleImpl defined."
+ " continuing without events.");
+ } else {
+
+ DEBUG0("Adding Timeout for remote event queue flushing");
+ if ( (priv->eventFlushTimer = virEventAddTimeout(-1,
+ remoteDomainEventQueueFlush,
+ conn)) < 0) {
+ DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
+ "continuing without events.");
+ }
+ }
/* Successful. */
retcode = VIR_DRV_OPEN_SUCCESS;
@@ -1101,6 +1143,11 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
(xdrproc_t) xdr_void, (char *) NULL) == -1)
return -1;
+ /* Remove handle for remote events */
+ virEventRemoveHandle(priv->sock);
+ /* Remove timout */
+ virEventRemoveTimeout(priv->eventFlushTimer);
+
/* Close socket. */
if (priv->uses_tls && priv->session) {
gnutls_bye (priv->session, GNUTLS_SHUT_RDWR);
@@ -1132,6 +1179,12 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
/* Free private data. */
priv->magic = DEAD;
+ /* Free callback list */
+ virDomainEventCallbackListFree(priv->callbackList);
+
+ /* Free queued events */
+ virDomainEventQueueFree(priv->domainEvents);
+
return 0;
}
@@ -4288,6 +4341,52 @@ remoteAuthPolkit (virConnectPtr conn, struct private_data *priv, int in_open,
return 0;
}
#endif /* HAVE_POLKIT */
+/*----------------------------------------------------------------------*/
+
+static int remoteDomainEventRegister (virConnectPtr conn,
+ void *callback ATTRIBUTE_UNUSED,
+ void *opaque ATTRIBUTE_UNUSED)
+{
+ struct private_data *priv = conn->privateData;
+
+ if (virDomainEventCallbackListAdd(conn, priv->callbackList,
+ callback, opaque) < 0) {
+ error (conn, VIR_ERR_RPC, _("adding cb to list"));
+ return -1;
+ }
+
+ if ( priv->callbackList->count == 1 ) {
+ /* Tell the server when we are the first callback deregistering */
+ if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_REGISTER,
+ (xdrproc_t) xdr_void, (char *) NULL,
+ (xdrproc_t) xdr_void, (char *) NULL) == -1)
+ return -1;
+ }
+
+ return 0;
+}
+
+static int remoteDomainEventDeregister (virConnectPtr conn,
+ void *callback ATTRIBUTE_UNUSED)
+{
+ struct private_data *priv = conn->privateData;
+
+ if (virDomainEventCallbackListRemove(conn, priv->callbackList,
+ callback) < 0) {
+ error (conn, VIR_ERR_RPC, _("removing cb fron list"));
+ return -1;
+ }
+
+ if ( priv->callbackList->count == 0 ) {
+ /* Tell the server when we are the last callback deregistering */
+ if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER,
+ (xdrproc_t) xdr_void, (char *) NULL,
+ (xdrproc_t) xdr_void, (char *) NULL) == -1)
+ return -1;
+ }
+
+ return 0;
+}
/*----------------------------------------------------------------------*/
@@ -4367,6 +4466,7 @@ call (virConnectPtr conn, struct private_data *priv,
really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
return -1;
+retry_read:
/* Read and deserialise length word. */
if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
return -1;
@@ -4418,10 +4518,20 @@ call (virConnectPtr conn, struct private_data *priv,
return -1;
}
- /* If we extend the server to actually send asynchronous messages, then
- * we'll need to change this so that it can recognise an asynch
- * message being received at this point.
- */
+ if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+ hdr.direction == REMOTE_MESSAGE) {
+ /* An async message has come in while we were waiting for the
+ * response. Process it to pull it off the wire, and try again
+ */
+ DEBUG0("Encountered an event while waiting for a response");
+
+ remoteDomainQueueEvent(conn, &xdr);
+ virEventUpdateTimeout(priv->eventFlushTimer, 0);
+
+ DEBUG0("Retrying read");
+ xdr_destroy (&xdr);
+ goto retry_read;
+ }
if (hdr.proc != proc_nr) {
__virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
NULL, NULL, VIR_FROM_REMOTE,
@@ -4872,6 +4982,8 @@ static virDriver driver = {
.domainMemoryPeek = remoteDomainMemoryPeek,
.nodeGetCellsFreeMemory = remoteNodeGetCellsFreeMemory,
.getFreeMemory = remoteNodeGetFreeMemory,
+ .domainEventRegister = remoteDomainEventRegister,
+ .domainEventDeregister = remoteDomainEventDeregister,
};
static virNetworkDriver network_driver = {
@@ -4957,3 +5069,157 @@ remoteRegister (void)
return 0;
}
+
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static int
+remoteDomainReadEvent(virConnectPtr conn, XDR *xdr,
+ virDomainPtr *dom, int *event)
+{
+ remote_domain_event_ret ret;
+ memset (&ret, 0, sizeof ret);
+
+ /* unmarshall parameters, and process it*/
+ if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+ error (conn, VIR_ERR_RPC,
+ _("remoteDomainProcessEvent: unmarshalling ret"));
+ return -1;
+ }
+
+ *dom = get_nonnull_domain(conn,ret.dom);
+ *event = ret.event;
+
+ return 0;
+}
+
+static void
+remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
+{
+ virDomainPtr dom;
+ int event,i;
+ struct private_data *priv = conn->privateData;
+
+ if(!remoteDomainReadEvent(conn, xdr, &dom, &event)) {
+ DEBUG0("Calling domain event callbacks (no queue)");
+ for(i=0 ; i < priv->callbackList->count ; i++) {
+ if( priv->callbackList->callbacks[i] )
+ priv->callbackList->callbacks[i]->cb(conn, dom, event,
+ priv->callbackList->callbacks[i]->opaque);
+ }
+ }
+}
+
+static void
+remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+ virDomainPtr dom;
+ int event;
+ struct private_data *priv = conn->privateData;
+
+ if(!remoteDomainReadEvent(conn, xdr, &dom, &event))
+ {
+ if( virDomainEventCallbackQueuePush(priv->domainEvents,
+ dom, event) < 0 ) {
+ DEBUG("%s", "Error adding event to queue");
+ }
+ }
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void
+remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
+ virEventHandleType event,
+ void *opaque)
+{
+ char buffer[REMOTE_MESSAGE_MAX];
+ char buffer2[4];
+ struct remote_message_header hdr;
+ XDR xdr;
+ int len;
+
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
+
+ DEBUG("%s : Event fired %d %X", __FUNCTION__, event, event);
+
+ if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+ DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+ "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+ virEventRemoveHandle(fd);
+ return;
+ }
+
+ /* Read and deserialise length word. */
+ if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
+ return;
+
+ xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
+ if (!xdr_int (&xdr, &len)) {
+ error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+ return;
+ }
+ xdr_destroy (&xdr);
+
+ /* Length includes length word - adjust to real length to read. */
+ len -= 4;
+
+ if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+ error (conn, VIR_ERR_RPC, _("packet received from server too large"));
+ return;
+ }
+
+ /* Read reply header and what follows (either a ret or an error). */
+ if (really_read (conn, priv, 0, buffer, len) == -1) {
+ error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
+ return;
+ }
+
+ /* Deserialise reply header. */
+ xdrmem_create (&xdr, buffer, len, XDR_DECODE);
+ if (!xdr_remote_message_header (&xdr, &hdr)) {
+ error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
+ return;
+ }
+
+ if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+ hdr.direction == REMOTE_MESSAGE) {
+ DEBUG0("Encountered an async event");
+ remoteDomainProcessEvent(conn, &xdr);
+ } else {
+ DEBUG0("invalid proc in event firing");
+ error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
+ }
+}
+
+void
+remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ int i;
+ virDomainEventPtr domEvent;
+ void *user_data = NULL;
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
+
+ while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) {
+ DEBUG(" Flushing %p", domEvent);
+ for (i=0 ; i < priv->callbackList->count ; i++) {
+ if( priv->callbackList->callbacks[i] ) {
+ user_data = priv->callbackList->callbacks[i]->opaque;
+ priv->callbackList->callbacks[i]->cb(domEvent->dom->conn,
+ domEvent->dom,
+ domEvent->event,
+ user_data);
+ }
+ }
+ VIR_FREE(domEvent);
+ }
+
+ virEventUpdateTimeout(priv->eventFlushTimer, -1);
+}
[Date Prev][Date Next] [Thread Prev][Thread Next]
[Thread Index]
[Date Index]
[Author Index]