[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] Re: [PATCH 07/12] Domain Events - remote driver



[PATCH 07/12] Domain Events - remote driver
Deliver local callbacks in response to remote events

 remote_internal.c |  276 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 271 insertions(+), 5 deletions(-)
diff --git a/src/remote_internal.c b/src/remote_internal.c
index 35b7b4b..8875674 100644
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -34,6 +34,7 @@
 #include <signal.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/poll.h>
 #include <fcntl.h>
 
 #ifdef HAVE_SYS_WAIT_H
@@ -73,6 +74,7 @@
 #include "remote_protocol.h"
 #include "memory.h"
 #include "util.h"
+#include "event.h"
 
 /* Per-connection private data. */
 #define MAGIC 999               /* private_data->magic if OK */
@@ -97,6 +99,13 @@ struct private_data {
     unsigned int saslDecodedLength;
     unsigned int saslDecodedOffset;
 #endif
+    /* The list of domain event callbacks */
+    virDomainEventCallbackListPtr callbackList;
+    /* The queue of domain events generated
+       during a call / response rpc          */
+    virDomainEventQueuePtr domainEvents;
+    /* Timer for flushing domainEvents queue */
+    int eventFlushTimer;
 };
 
 #define GET_PRIVATE(conn,retcode)                                       \
@@ -156,7 +165,10 @@ static void make_nonnull_domain (remote_nonnull_domain *dom_dst, virDomainPtr do
 static void make_nonnull_network (remote_nonnull_network *net_dst, virNetworkPtr net_src);
 static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
 static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
-
+void remoteDomainEventFired(int fd, virEventHandleType event, void *data);
+static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
+static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
+void remoteDomainEventQueueFlush(int timer, void *opaque);
 /*----------------------------------------------------------------------*/
 
 /* Helper functions for remoteOpen. */
@@ -680,6 +692,36 @@ doRemoteOpen (virConnectPtr conn,
               (xdrproc_t) xdr_void, (char *) NULL) == -1)
         goto failed;
 
+    if(VIR_ALLOC(priv->callbackList)<0) {
+        error(conn, VIR_ERR_INVALID_ARG, _("Error allocating callbacks list"));
+        goto failed;
+    }
+
+    if(VIR_ALLOC(priv->domainEvents)<0) {
+        error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents"));
+        goto failed;
+    }
+
+    DEBUG0("Adding Handler for remote events");
+    /* Set up a callback to listen on the socket data */
+    if (virEventAddHandle(priv->sock,
+                          VIR_EVENT_HANDLE_READABLE |
+                              VIR_EVENT_HANDLE_ERROR |
+                              VIR_EVENT_HANDLE_HANGUP,
+                          remoteDomainEventFired,
+                          conn) < 0) {
+        DEBUG0("virEventAddHandle failed: No addHandleImpl defined."
+               " continuing without events.");
+    } else {
+
+        DEBUG0("Adding Timeout for remote event queue flushing");
+        if ( (priv->eventFlushTimer = virEventAddTimeout(-1,
+                                               remoteDomainEventQueueFlush,
+                                               conn)) < 0) {
+            DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
+                    "continuing without events.");
+        }
+    }
     /* Successful. */
     retcode = VIR_DRV_OPEN_SUCCESS;
 
@@ -1101,6 +1143,11 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
               (xdrproc_t) xdr_void, (char *) NULL) == -1)
         return -1;
 
+    /* Remove handle for remote events */
+    virEventRemoveHandle(priv->sock);
+    /* Remove timout */
+    virEventRemoveTimeout(priv->eventFlushTimer);
+
     /* Close socket. */
     if (priv->uses_tls && priv->session) {
         gnutls_bye (priv->session, GNUTLS_SHUT_RDWR);
@@ -1132,6 +1179,12 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
     /* Free private data. */
     priv->magic = DEAD;
 
+    /* Free callback list */
+    virDomainEventCallbackListFree(priv->callbackList);
+
+    /* Free queued events */
+    virDomainEventQueueFree(priv->domainEvents);
+
     return 0;
 }
 
@@ -4288,6 +4341,52 @@ remoteAuthPolkit (virConnectPtr conn, struct private_data *priv, int in_open,
     return 0;
 }
 #endif /* HAVE_POLKIT */
+/*----------------------------------------------------------------------*/
+
+static int remoteDomainEventRegister (virConnectPtr conn,
+                               void *callback ATTRIBUTE_UNUSED,
+                               void *opaque ATTRIBUTE_UNUSED)
+{
+    struct private_data *priv = conn->privateData;
+
+    if (virDomainEventCallbackListAdd(conn, priv->callbackList,
+                                  callback, opaque) < 0) {
+         error (conn, VIR_ERR_RPC, _("adding cb to list"));
+         return -1;
+    }
+
+    if ( priv->callbackList->count == 1 ) {
+        /* Tell the server when we are the first callback deregistering */
+        if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_REGISTER,
+                (xdrproc_t) xdr_void, (char *) NULL,
+                (xdrproc_t) xdr_void, (char *) NULL) == -1)
+            return -1;
+    }
+
+    return 0;
+}
+
+static int remoteDomainEventDeregister (virConnectPtr conn,
+                                 void *callback ATTRIBUTE_UNUSED)
+{
+    struct private_data *priv = conn->privateData;
+
+    if (virDomainEventCallbackListRemove(conn, priv->callbackList,
+                                  callback) < 0) {
+         error (conn, VIR_ERR_RPC, _("removing cb fron list"));
+         return -1;
+    }
+
+    if ( priv->callbackList->count == 0 ) {
+        /* Tell the server when we are the last callback deregistering */
+        if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER,
+                (xdrproc_t) xdr_void, (char *) NULL,
+                (xdrproc_t) xdr_void, (char *) NULL) == -1)
+            return -1;
+    }
+
+    return 0;
+}
 
 /*----------------------------------------------------------------------*/
 
@@ -4367,6 +4466,7 @@ call (virConnectPtr conn, struct private_data *priv,
         really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
         return -1;
 
+retry_read:
     /* Read and deserialise length word. */
     if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
         return -1;
@@ -4418,10 +4518,20 @@ call (virConnectPtr conn, struct private_data *priv,
         return -1;
     }
 
-    /* If we extend the server to actually send asynchronous messages, then
-     * we'll need to change this so that it can recognise an asynch
-     * message being received at this point.
-     */
+    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+        hdr.direction == REMOTE_MESSAGE) {
+        /* An async message has come in while we were waiting for the
+         * response. Process it to pull it off the wire, and try again
+         */
+        DEBUG0("Encountered an event while waiting for a response");
+
+        remoteDomainQueueEvent(conn, &xdr);
+        virEventUpdateTimeout(priv->eventFlushTimer, 0);
+
+        DEBUG0("Retrying read");
+        xdr_destroy (&xdr);
+        goto retry_read;
+    }
     if (hdr.proc != proc_nr) {
         __virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
                          NULL, NULL, VIR_FROM_REMOTE,
@@ -4872,6 +4982,8 @@ static virDriver driver = {
     .domainMemoryPeek = remoteDomainMemoryPeek,
     .nodeGetCellsFreeMemory = remoteNodeGetCellsFreeMemory,
     .getFreeMemory = remoteNodeGetFreeMemory,
+    .domainEventRegister = remoteDomainEventRegister,
+    .domainEventDeregister = remoteDomainEventDeregister,
 };
 
 static virNetworkDriver network_driver = {
@@ -4957,3 +5069,157 @@ remoteRegister (void)
 
     return 0;
 }
+
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static int
+remoteDomainReadEvent(virConnectPtr conn, XDR *xdr,
+                      virDomainPtr *dom, int *event)
+{
+    remote_domain_event_ret ret;
+    memset (&ret, 0, sizeof ret);
+
+    /* unmarshall parameters, and process it*/
+    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+        error (conn, VIR_ERR_RPC,
+               _("remoteDomainProcessEvent: unmarshalling ret"));
+        return -1;
+    }
+
+    *dom = get_nonnull_domain(conn,ret.dom);
+    *event = ret.event;
+
+    return 0;
+}
+
+static void
+remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
+{
+    virDomainPtr dom;
+    int event,i;
+    struct private_data *priv = conn->privateData;
+
+    if(!remoteDomainReadEvent(conn, xdr, &dom, &event)) {
+        DEBUG0("Calling domain event callbacks (no queue)");
+        for(i=0 ; i < priv->callbackList->count ; i++) {
+           if( priv->callbackList->callbacks[i] )
+               priv->callbackList->callbacks[i]->cb(conn, dom, event,
+                                     priv->callbackList->callbacks[i]->opaque);
+        }
+    }
+}
+
+static void
+remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+    virDomainPtr dom;
+    int event;
+    struct private_data *priv = conn->privateData;
+
+    if(!remoteDomainReadEvent(conn, xdr, &dom, &event))
+    {
+        if( virDomainEventCallbackQueuePush(priv->domainEvents,
+                                            dom, event) < 0 ) {
+            DEBUG("%s", "Error adding event to queue");
+        }
+    }
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void
+remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
+                       virEventHandleType event,
+                       void *opaque)
+{
+    char buffer[REMOTE_MESSAGE_MAX];
+    char buffer2[4];
+    struct remote_message_header hdr;
+    XDR xdr;
+    int len;
+
+    virConnectPtr        conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    DEBUG("%s : Event fired %d %X", __FUNCTION__, event, event);
+
+    if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+         DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+               "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+         virEventRemoveHandle(fd);
+         return;
+    }
+
+    /* Read and deserialise length word. */
+    if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
+        return;
+
+    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
+    if (!xdr_int (&xdr, &len)) {
+        error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+        return;
+    }
+    xdr_destroy (&xdr);
+
+    /* Length includes length word - adjust to real length to read. */
+    len -= 4;
+
+    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+        error (conn, VIR_ERR_RPC, _("packet received from server too large"));
+        return;
+    }
+
+    /* Read reply header and what follows (either a ret or an error). */
+    if (really_read (conn, priv, 0, buffer, len) == -1) {
+        error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
+        return;
+    }
+
+    /* Deserialise reply header. */
+    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
+        return;
+    }
+
+    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+        hdr.direction == REMOTE_MESSAGE) {
+        DEBUG0("Encountered an async event");
+        remoteDomainProcessEvent(conn, &xdr);
+    } else {
+        DEBUG0("invalid proc in event firing");
+        error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
+    }
+}
+
+void
+remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+    int i;
+    virDomainEventPtr domEvent;
+    void *user_data = NULL;
+    virConnectPtr conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) {
+        DEBUG("   Flushing %p", domEvent);
+        for (i=0 ; i < priv->callbackList->count ; i++) {
+           if( priv->callbackList->callbacks[i] ) {
+               user_data = priv->callbackList->callbacks[i]->opaque;
+               priv->callbackList->callbacks[i]->cb(domEvent->dom->conn,
+                                                    domEvent->dom,
+                                                    domEvent->event,
+                                                    user_data);
+           }
+        }
+        VIR_FREE(domEvent);
+    }
+
+    virEventUpdateTimeout(priv->eventFlushTimer, -1);
+}

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]