[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] Re: [PATCH 03/12] Domain Events - daemon changes



[PATCH 03/12] Domain Events - daemon changes
This code changes the daemaon to:
  use the pulic def of virEventRegisterImpl
  Add functionality to dispatch events to connected remote drivers

 event.c  |   21 +++++----
 event.h  |    5 +-
 mdns.c   |   15 ++++--
 qemud.c  |   72 ++++++++++++++++++++-----------
 qemud.h  |   11 ++++
 remote.c |  143 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 227 insertions(+), 40 deletions(-)
diff --git a/qemud/event.c b/qemud/event.c
index bb1f381..f391cd1 100644
--- a/qemud/event.c
+++ b/qemud/event.c
@@ -38,7 +38,7 @@
 /* State for a single file handle being monitored */
 struct virEventHandle {
     int fd;
-    int events;
+    virEventHandleType events;
     virEventHandleCallback cb;
     void *opaque;
     int deleted;
@@ -74,13 +74,13 @@ static struct virEventLoop eventLoop;
 /* Unique ID for the next timer to be registered */
 static int nextTimer = 0;
 
-
 /*
  * Register a callback for monitoring file handle events.
  * NB, it *must* be safe to call this from within a callback
  * For this reason we only ever append to existing list.
  */
-int virEventAddHandleImpl(int fd, int events, virEventHandleCallback cb, void *opaque) {
+int virEventAddHandleImpl(int fd, virEventHandleType events,
+                          virEventHandleCallback cb, void *opaque) {
     EVENT_DEBUG("Add handle %d %d %p %p", fd, events, cb, opaque);
     if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
         EVENT_DEBUG("Used %d handle slots, adding %d more",
@@ -92,7 +92,8 @@ int virEventAddHandleImpl(int fd, int events, virEventHandleCallback cb, void *o
     }
 
     eventLoop.handles[eventLoop.handlesCount].fd = fd;
-    eventLoop.handles[eventLoop.handlesCount].events = events;
+    eventLoop.handles[eventLoop.handlesCount].events =
+                                         virEventHandleTypeToPollEvent(events);
     eventLoop.handles[eventLoop.handlesCount].cb = cb;
     eventLoop.handles[eventLoop.handlesCount].opaque = opaque;
     eventLoop.handles[eventLoop.handlesCount].deleted = 0;
@@ -102,11 +103,12 @@ int virEventAddHandleImpl(int fd, int events, virEventHandleCallback cb, void *o
     return 0;
 }
 
-void virEventUpdateHandleImpl(int fd, int events) {
+void virEventUpdateHandleImpl(int fd, virEventHandleType events) {
     int i;
     for (i = 0 ; i < eventLoop.handlesCount ; i++) {
         if (eventLoop.handles[i].fd == fd) {
-            eventLoop.handles[i].events = events;
+            eventLoop.handles[i].events =
+                    virEventHandleTypeToPollEvent(events);
             break;
         }
     }
@@ -342,6 +344,7 @@ static int virEventDispatchTimeouts(void) {
  */
 static int virEventDispatchHandles(struct pollfd *fds) {
     int i;
+    virEventHandleType hEvents;
     /* Save this now - it may be changed during dispatch */
     int nhandles = eventLoop.handlesCount;
 
@@ -352,8 +355,10 @@ static int virEventDispatchHandles(struct pollfd *fds) {
         }
 
         if (fds[i].revents) {
-            EVENT_DEBUG("Dispatch %d %d %p", fds[i].fd, fds[i].revents, eventLoop.handles[i].opaque);
-            (eventLoop.handles[i].cb)(fds[i].fd, fds[i].revents,
+            hEvents = virPollEventToEventHandleType(fds[i].revents);
+            EVENT_DEBUG("Dispatch %d %d %p", fds[i].fd, fds[i].revents,
+                        eventLoop.handles[i].opaque);
+            (eventLoop.handles[i].cb)(fds[i].fd, hEvents,
                                       eventLoop.handles[i].opaque);
         }
     }
diff --git a/qemud/event.h b/qemud/event.h
index adf7b6d..3fff617 100644
--- a/qemud/event.h
+++ b/qemud/event.h
@@ -36,7 +36,8 @@
  *
  * returns -1 if the file handle cannot be registered, 0 upon success
  */
-int virEventAddHandleImpl(int fd, int events, virEventHandleCallback cb, void *opaque);
+int virEventAddHandleImpl(int fd, virEventHandleType events,
+                          virEventHandleCallback cb, void *opaque);
 
 /**
  * virEventUpdateHandleImpl: change event set for a monitored file handle
@@ -46,7 +47,7 @@ int virEventAddHandleImpl(int fd, int events, virEventHandleCallback cb, void *o
  *
  * Will not fail if fd exists
  */
-void virEventUpdateHandleImpl(int fd, int events);
+void virEventUpdateHandleImpl(int fd, virEventHandleType events);
 
 /**
  * virEventRemoveHandleImpl: unregister a callback from a file handle
diff --git a/qemud/mdns.c b/qemud/mdns.c
index 9147946..df1b6c9 100644
--- a/qemud/mdns.c
+++ b/qemud/mdns.c
@@ -228,17 +228,20 @@ static void libvirtd_mdns_client_callback(AvahiClient *c, AvahiClientState state
 }
 
 
-static void libvirtd_mdns_watch_dispatch(int fd, int events, void *opaque)
+static void libvirtd_mdns_watch_dispatch(int fd, virEventHandleType events, 
+                                         void *opaque)
 {
     AvahiWatch *w = (AvahiWatch*)opaque;
-    AVAHI_DEBUG("Dispatch watch FD %d Event %d", fd, events);
-    w->revents = events;
-    w->callback(w, fd, events, w->userdata);
+    int fd_events = virEventHandleTypeToPollEvent(events);
+    AVAHI_DEBUG("Dispatch watch FD %d Event %d", fd, fd_events);
+    w->revents = fd_events;
+    w->callback(w, fd, fd_events, w->userdata);
 }
 
 static AvahiWatch *libvirtd_mdns_watch_new(const AvahiPoll *api ATTRIBUTE_UNUSED,
                                             int fd, AvahiWatchEvent event, AvahiWatchCallback cb, void *userdata) {
     AvahiWatch *w;
+    virEventHandleType hEvents;
     if (VIR_ALLOC(w) < 0)
         return NULL;
 
@@ -248,7 +251,9 @@ static AvahiWatch *libvirtd_mdns_watch_new(const AvahiPoll *api ATTRIBUTE_UNUSED
     w->userdata = userdata;
 
     AVAHI_DEBUG("New handle %p FD %d Event %d", w, w->fd, event);
-    if (virEventAddHandleImpl(fd, event, libvirtd_mdns_watch_dispatch, w) < 0) {
+    hEvents = virPollEventToEventHandleType(event);
+    if (virEventAddHandleImpl(fd, hEvents,
+                              libvirtd_mdns_watch_dispatch, w) < 0) {
         VIR_FREE(w);
         return NULL;
     }
diff --git a/qemud/qemud.c b/qemud/qemud.c
index 9da27d2..437841f 100644
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -128,8 +128,10 @@ static void sig_handler(int sig, siginfo_t * siginfo,
     errno = origerrno;
 }
 
-static void qemudDispatchClientEvent(int fd, int events, void *opaque);
-static void qemudDispatchServerEvent(int fd, int events, void *opaque);
+static void qemudDispatchClientEvent(int fd, virEventHandleType events,
+                                     void *opaque);
+static void qemudDispatchServerEvent(int fd, virEventHandleType events,
+                                     void *opaque);
 static int qemudRegisterClientEvent(struct qemud_server *server,
                                     struct qemud_client *client,
                                     int removeFirst);
@@ -230,9 +232,10 @@ remoteInitializeGnuTLS (void)
     return 0;
 }
 
-static void qemudDispatchSignalEvent(int fd ATTRIBUTE_UNUSED,
-                                     int events ATTRIBUTE_UNUSED,
-                                     void *opaque) {
+static void
+qemudDispatchSignalEvent(int fd ATTRIBUTE_UNUSED,
+                         virEventHandleType events ATTRIBUTE_UNUSED,
+                         void *opaque) {
     struct qemud_server *server = (struct qemud_server *)opaque;
     siginfo_t siginfo;
     int ret;
@@ -521,10 +524,13 @@ static int qemudListenUnix(struct qemud_server *server,
     }
 
     if (virEventAddHandleImpl(sock->fd,
-                              POLLIN| POLLERR | POLLHUP,
+                              VIR_EVENT_HANDLE_READABLE |
+                                 VIR_EVENT_HANDLE_ERROR |
+                                 VIR_EVENT_HANDLE_HANGUP,
                               qemudDispatchServerEvent,
                               server) < 0) {
-        qemudLog(QEMUD_ERR, "%s", _("Failed to add server event callback"));
+        qemudLog(QEMUD_ERR, "%s",
+                 _("Failed to add server event callback"));
         goto cleanup;
     }
 
@@ -650,7 +656,9 @@ remoteListenTCP (struct qemud_server *server,
         }
 
         if (virEventAddHandleImpl(sock->fd,
-                                  POLLIN| POLLERR | POLLHUP,
+                                  VIR_EVENT_HANDLE_READABLE |
+                                      VIR_EVENT_HANDLE_ERROR |
+                                      VIR_EVENT_HANDLE_HANGUP,
                                   qemudDispatchServerEvent,
                                   server) < 0) {
             qemudLog(QEMUD_ERR, "%s", _("Failed to add server event callback"));
@@ -723,12 +731,12 @@ static struct qemud_server *qemudInitialize(int sigread) {
 
     server->sigread = sigread;
 
-    __virEventRegisterImpl(virEventAddHandleImpl,
-                           virEventUpdateHandleImpl,
-                           virEventRemoveHandleImpl,
-                           virEventAddTimeoutImpl,
-                           virEventUpdateTimeoutImpl,
-                           virEventRemoveTimeoutImpl);
+    virEventRegisterImpl(virEventAddHandleImpl,
+                         virEventUpdateHandleImpl,
+                         virEventRemoveHandleImpl,
+                         virEventAddTimeoutImpl,
+                         virEventUpdateTimeoutImpl,
+                         virEventRemoveTimeoutImpl);
 
     virStateInitialize();
 
@@ -1105,6 +1113,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
     client->auth = sock->auth;
     memcpy (&client->addr, &addr, sizeof addr);
     client->addrlen = addrlen;
+    client->server = server;
 
 #if HAVE_POLKIT
     /* Only do policy checks for non-root - allow root user
@@ -1199,6 +1208,12 @@ static void qemudDispatchClientFailure(struct qemud_server *server, struct qemud
 
     virEventRemoveHandleImpl(client->fd);
 
+    /* Deregister event delivery callback */
+    if(client->conn) {
+        qemudDebug("Deregistering to relay remote events");
+        virConnectDomainEventDeregister(client->conn, remoteRelayDomainEvent);
+    }
+
     if (client->conn)
         virConnectClose(client->conn);
 
@@ -1503,7 +1518,9 @@ static int qemudClientWrite(struct qemud_server *server,
 }
 
 
-static void qemudDispatchClientWrite(struct qemud_server *server, struct qemud_client *client) {
+void
+qemudDispatchClientWrite(struct qemud_server *server,
+                         struct qemud_client *client) {
     switch (client->mode) {
     case QEMUD_MODE_TX_PACKET: {
         if (qemudClientWrite(server, client) < 0)
@@ -1552,7 +1569,9 @@ static void qemudDispatchClientWrite(struct qemud_server *server, struct qemud_c
 }
 
 
-static void qemudDispatchClientEvent(int fd, int events, void *opaque) {
+static void
+qemudDispatchClientEvent(int fd, virEventHandleType events,
+                         void *opaque) {
     struct qemud_server *server = (struct qemud_server *)opaque;
     struct qemud_client *client = server->clients;
 
@@ -1566,9 +1585,9 @@ static void qemudDispatchClientEvent(int fd, int events, void *opaque) {
     if (!client)
         return;
 
-    if (events == POLLOUT)
+    if (events == VIR_EVENT_HANDLE_WRITABLE)
         qemudDispatchClientWrite(server, client);
-    else if (events == POLLIN)
+    else if (events == VIR_EVENT_HANDLE_READABLE)
         qemudDispatchClientRead(server, client);
     else
         qemudDispatchClientFailure(server, client);
@@ -1581,18 +1600,18 @@ static int qemudRegisterClientEvent(struct qemud_server *server,
     switch (client->mode) {
     case QEMUD_MODE_TLS_HANDSHAKE:
         if (gnutls_record_get_direction (client->tlssession) == 0)
-            mode = POLLIN;
+            mode = VIR_EVENT_HANDLE_READABLE;
         else
-            mode = POLLOUT;
+            mode = VIR_EVENT_HANDLE_WRITABLE;
         break;
 
     case QEMUD_MODE_RX_HEADER:
     case QEMUD_MODE_RX_PAYLOAD:
-        mode = POLLIN;
+        mode = VIR_EVENT_HANDLE_READABLE;
         break;
 
     case QEMUD_MODE_TX_PACKET:
-        mode = POLLOUT;
+        mode = VIR_EVENT_HANDLE_WRITABLE;
         break;
 
     default:
@@ -1604,7 +1623,8 @@ static int qemudRegisterClientEvent(struct qemud_server *server,
             return -1;
 
     if (virEventAddHandleImpl(client->fd,
-                              mode | POLLERR | POLLHUP,
+                              mode | VIR_EVENT_HANDLE_ERROR |
+                                     VIR_EVENT_HANDLE_HANGUP,
                               qemudDispatchClientEvent,
                               server) < 0)
             return -1;
@@ -1612,7 +1632,9 @@ static int qemudRegisterClientEvent(struct qemud_server *server,
     return 0;
 }
 
-static void qemudDispatchServerEvent(int fd, int events, void *opaque) {
+static void
+qemudDispatchServerEvent(int fd, virEventHandleType events,
+                         void *opaque) {
     struct qemud_server *server = (struct qemud_server *)opaque;
     struct qemud_socket *sock = server->sockets;
 
@@ -2215,7 +2237,7 @@ int main(int argc, char **argv) {
     }
 
     if (virEventAddHandleImpl(sigpipe[0],
-                              POLLIN,
+                              VIR_EVENT_HANDLE_READABLE,
                               qemudDispatchSignalEvent,
                               server) < 0) {
         qemudLog(QEMUD_ERR,
diff --git a/qemud/qemud.h b/qemud/qemud.h
index 91cb939..83d65b6 100644
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -132,6 +132,9 @@ struct qemud_client {
      */
     virConnectPtr conn;
 
+    /* back-pointer to our server */
+    struct qemud_server *server;
+
     struct qemud_client *next;
 };
 
@@ -179,8 +182,16 @@ void qemudLog(int priority, const char *fmt, ...)
 void remoteDispatchClientRequest (struct qemud_server *server,
                                   struct qemud_client *client);
 
+void qemudDispatchClientWrite(struct qemud_server *server,
+                             struct qemud_client *client);
+
 #if HAVE_POLKIT
 int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
 #endif
 
+int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
+                            virDomainPtr dom,
+                            int event,
+                            void *opaque);
+
 #endif
diff --git a/qemud/remote.c b/qemud/remote.c
index 72e064e..a623494 100644
--- a/qemud/remote.c
+++ b/qemud/remote.c
@@ -75,6 +75,12 @@ typedef int (*dispatch_fn) (struct qemud_server *server,
                             char *args,
                             char *ret);
 
+/* Prototypes */
+static void
+remoteDispatchDomainEventSend (struct qemud_client *client,
+                               virDomainPtr dom,
+                               virDomainEventType event);
+
 /* This function gets called from qemud when it detects an incoming
  * remote protocol message.  At this point, client->buffer contains
  * the full call message (including length word which we skip).
@@ -405,6 +411,20 @@ remoteDispatchError (struct qemud_client *client,
     remoteDispatchSendError (client, req, VIR_ERR_RPC, msg);
 }
 
+int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
+                                   virDomainPtr dom,
+                                   int event,
+                                   void *opaque)
+{
+    struct qemud_client *client = opaque;
+    REMOTE_DEBUG("Relaying domain event %d", event);
+
+    if(client) {
+        remoteDispatchDomainEventSend (client, dom, event);
+        qemudDispatchClientWrite(client->server,client);
+    }
+    return 0;
+}
 
 
 /*----- Functions. -----*/
@@ -3620,6 +3640,129 @@ remoteDispatchStorageVolLookupByPath (struct qemud_server *server ATTRIBUTE_UNUS
 }
 
 
+/**************************
+ * Async Events
+ **************************/
+static int
+remoteDispatchDomainEvent (struct qemud_server *server ATTRIBUTE_UNUSED,
+                           struct qemud_client *client ATTRIBUTE_UNUSED,
+                           remote_message_header *req ATTRIBUTE_UNUSED,
+                           void *args ATTRIBUTE_UNUSED,
+                           remote_domain_event_ret *ret ATTRIBUTE_UNUSED)
+{
+    /* This call gets dispatched from a client call.
+     * This does not make sense, as this should not be intiated
+     * from the client side in generated code.
+     */
+     return -1;
+}
+
+/***************************
+ * Register / deregister events
+ ***************************/
+static int
+remoteDispatchDomainEventsRegister (struct qemud_server *server ATTRIBUTE_UNUSED,
+                                    struct qemud_client *client,
+                                    remote_message_header *req ATTRIBUTE_UNUSED,
+                                    void *args ATTRIBUTE_UNUSED,
+                                    remote_domain_events_register_ret *ret ATTRIBUTE_UNUSED)
+{
+    CHECK_CONN(client);
+
+    /* Register event delivery callback */
+    REMOTE_DEBUG("%s","Registering to relay remote events");
+    virConnectDomainEventRegister(client->conn, remoteRelayDomainEvent, client);
+
+    if(ret)
+        ret->cb_registered = 1;
+    return 0;
+}
+
+static int
+remoteDispatchDomainEventsDeregister (struct qemud_server *server ATTRIBUTE_UNUSED,
+                                      struct qemud_client *client,
+                                      remote_message_header *req ATTRIBUTE_UNUSED,
+                                      void *args ATTRIBUTE_UNUSED,
+                                      remote_domain_events_deregister_ret *ret ATTRIBUTE_UNUSED)
+{
+    CHECK_CONN(client);
+
+    /* Deregister event delivery callback */
+    REMOTE_DEBUG("%s","Deregistering to relay remote events");
+    virConnectDomainEventDeregister(client->conn, remoteRelayDomainEvent);
+
+    if(ret)
+        ret->cb_registered = 0;
+    return 0;
+}
+
+static void
+remoteDispatchDomainEventSend (struct qemud_client *client,
+                         virDomainPtr dom,
+                         virDomainEventType event)
+{
+    remote_message_header rep;
+    XDR xdr;
+    int len;
+    remote_domain_event_ret data;
+
+    if(!client) {
+        remoteDispatchError (client, NULL, "%s", _("Invalid Client"));
+        return;
+    }
+
+    rep.prog = REMOTE_PROGRAM;
+    rep.vers = REMOTE_PROTOCOL_VERSION;
+    rep.proc = REMOTE_PROC_DOMAIN_EVENT;
+    rep.direction = REMOTE_MESSAGE;
+    rep.serial = 1;
+    rep.status = REMOTE_OK;
+
+    /* Serialise the return header and event. */
+    xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+
+    len = 0; /* We'll come back and write this later. */
+    if (!xdr_int (&xdr, &len)) {
+        remoteDispatchError (client, NULL, "%s", _("xdr_int failed (1)"));
+        xdr_destroy (&xdr);
+        return;
+    }
+
+    if (!xdr_remote_message_header (&xdr, &rep)) {
+        xdr_destroy (&xdr);
+        return;
+    }
+
+    /* build return data */
+    make_nonnull_domain (&data.dom, dom);
+    data.event = (int) event;
+
+    if (!xdr_remote_domain_event_ret(&xdr, &data)) {
+        remoteDispatchError (client, NULL, "%s", _("serialise return struct"));
+        xdr_destroy (&xdr);
+        return;
+    }
+
+    len = xdr_getpos (&xdr);
+    if (xdr_setpos (&xdr, 0) == 0) {
+        remoteDispatchError (client, NULL, "%s", _("xdr_setpos failed"));
+        xdr_destroy (&xdr);
+        return;
+    }
+
+    if (!xdr_int (&xdr, &len)) {
+        remoteDispatchError (client, NULL, "%s", _("xdr_int failed (2)"));
+        xdr_destroy (&xdr);
+        return;
+    }
+
+    xdr_destroy (&xdr);
+
+    /* Send it. */
+    client->mode = QEMUD_MODE_TX_PACKET;
+    client->bufferLength = len;
+    client->bufferOffset = 0;
+}
 /*----- Helpers. -----*/
 
 /* get_nonnull_domain and get_nonnull_network turn an on-wire

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]