[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [Libvir] PATCH 1/3: Split out simple event loop



The following patch adds a qemud/event.c & qemud/event.h file providing a
general purpose event loop built around poll. Users register file handles
and associated callbacks, and / or timers. The qemud.c file is changed to
make use of these APIs for dealing with server, client, and VM file handles
and/or sockets. This decouples much of the QEMU VM I/O code from the main
qemud.c daemon code.

 qemud/event.c   |  311 +++++++++++++++++++++++++++++++++++++++++
 qemud/event.h   |   13 +
 qemud/Makefile.am |    3 
 qemud/qemud.c     |  401 +++++++++++++++++++++++++++++-------------------------
 qemud/remote.c    |    5 
 5 files changed, 546 insertions(+), 187 deletions(-)


Dan.
-- 
|=- Red Hat, Engineering, Emerging Technologies, Boston.  +1 978 392 2496 -=|
|=-           Perl modules: http://search.cpan.org/~danberr/              -=|
|=-               Projects: http://freshmeat.net/~danielpb/               -=|
|=-  GnuPG: 7D3B9505   F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505  -=| 
diff -r 93de958458cb qemud/Makefile.am
--- a/qemud/Makefile.am	Fri Jun 15 14:27:39 2007 +0000
+++ b/qemud/Makefile.am	Sun Jun 17 16:26:41 2007 -0400
@@ -16,7 +16,8 @@ libvirt_qemud_SOURCES = \
 		buf.c buf.h \
 		protocol.h protocol.c \
 		remote_protocol.h remote_protocol.c \
-		remote.c
+		remote.c \
+                event.c event.h
 #-D_XOPEN_SOURCE=600 -D_XOPEN_SOURCE_EXTENDED=1 -D_POSIX_C_SOURCE=199506L
 libvirt_qemud_CFLAGS = \
         -I$(top_srcdir)/include -I$(top_builddir)/include $(LIBXML_CFLAGS) \
diff -r 93de958458cb qemud/event.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qemud/event.c	Sun Jun 17 16:26:34 2007 -0400
@@ -0,0 +1,311 @@
+
+#include <stdlib.h>
+#include <string.h>
+#include <poll.h>
+#include <sys/time.h>
+#include <errno.h>
+
+#include "event.h"
+
+struct virEventHandle {
+    int fd;
+    int events;
+    virEventHandleCallback cb;
+    void *opaque;
+    int deleted;
+};
+
+struct virEventTimeout {
+    int timer;
+    int timeout;
+    unsigned long long expiresAt;
+    virEventTimeoutCallback cb;
+    void *opaque;
+    int deleted;
+};
+
+struct virEventLoop {
+    int nhandles;
+    struct virEventHandle *handles;
+    int ntimeouts;
+    struct virEventTimeout *timeouts;
+};
+
+static struct virEventLoop eventLoop;
+static int nextTimer = 0;
+
+int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque) {
+    struct virEventHandle *tmp;
+
+    printf("Add handle %d %d %p %p\n", fd, events, cb, opaque);
+    tmp = realloc(eventLoop.handles, sizeof(struct virEventHandle) * (eventLoop.nhandles+1));
+    if (!tmp) {
+        return -1;
+    }
+    eventLoop.handles = tmp;
+
+    eventLoop.handles[eventLoop.nhandles].fd = fd;
+    eventLoop.handles[eventLoop.nhandles].events = events;
+    eventLoop.handles[eventLoop.nhandles].cb = cb;
+    eventLoop.handles[eventLoop.nhandles].opaque = opaque;
+    eventLoop.handles[eventLoop.nhandles].deleted = 0;
+
+    eventLoop.nhandles++;
+
+    return 0;
+}
+
+int virEventRemoveHandle(int fd) {
+    int i;
+    printf("Remove handle %d\n", fd);
+    for (i = eventLoop.nhandles-1 ; i >= 0 ; i--) {
+        if (eventLoop.handles[i].fd == fd) {
+            printf("mark delete %d\n", i);
+            eventLoop.handles[i].deleted = 1;
+            return 0;
+        }
+    }
+    return -1;
+}
+
+
+int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque) {
+    struct virEventTimeout *tmp;
+    struct timeval tv;
+
+    if (gettimeofday(&tv, NULL) < 0) {
+        return -1;
+    }
+
+    tmp = realloc(eventLoop.timeouts, sizeof(struct virEventTimeout) * (eventLoop.ntimeouts+1));
+    if (!tmp) {
+        return -1;
+    }
+    eventLoop.timeouts = tmp;
+
+    eventLoop.timeouts[eventLoop.ntimeouts].timer = nextTimer++;
+    eventLoop.timeouts[eventLoop.ntimeouts].timeout = timeout;
+    eventLoop.timeouts[eventLoop.ntimeouts].cb = cb;
+    eventLoop.timeouts[eventLoop.ntimeouts].opaque = opaque;
+    eventLoop.timeouts[eventLoop.ntimeouts].deleted = 0;
+    eventLoop.timeouts[eventLoop.ntimeouts].expiresAt =
+        (((unsigned long long)tv.tv_sec)*1000) +
+        (((unsigned long long)tv.tv_usec)/1000);
+
+    eventLoop.ntimeouts++;
+
+    return 0;
+}
+
+int virEventRemoveTimeout(int timer) {
+    int i;
+    for (i = eventLoop.ntimeouts-1 ; i >=0 ; i--) {
+        if (eventLoop.timeouts[i].timer == timer) {
+            eventLoop.timeouts[i].deleted = 1;
+            return 0;
+        }
+    }
+    return -1;
+}
+
+
+static int virEventCalculateTimeout(int *timeout) {
+    unsigned long long then = 0;
+    int i;
+
+    /* Figure out if we need a timeout */
+    for (i = 0 ; i < eventLoop.ntimeouts ; i++) {
+        if (then == 0 ||
+            eventLoop.timeouts[i].expiresAt < then)
+            then = eventLoop.timeouts[i].expiresAt;
+    }
+
+    /* Calculate how long we should wait for a timeout if needed */
+    if (then > 0) {
+        struct timeval tv;
+
+        if (gettimeofday(&tv, NULL) < 0) {
+            return -1;
+        }
+
+        *timeout = then -
+            ((((unsigned long long)tv.tv_sec)*1000) +
+             (((unsigned long long)tv.tv_usec)/1000));
+
+        if (*timeout < 0)
+            *timeout = 1;
+    } else {
+        *timeout = -1;
+    }
+
+    return 0;
+}
+
+static struct pollfd *virEventMakePollFDs(void) {
+    struct pollfd *fds;
+    int i;
+
+    /* Setup the poll file handle data structs */
+    if (!(fds = malloc(sizeof(struct pollfd)*eventLoop.nhandles)))
+        return NULL;
+
+    for (i = 0 ; i < eventLoop.nhandles ; i++) {
+        fds[i].fd = eventLoop.handles[i].fd;
+        fds[i].events = eventLoop.handles[i].events;
+        fds[i].revents = 0;
+        printf("Wait for %d %d\n", eventLoop.handles[i].fd, eventLoop.handles[i].events);
+    }
+
+    return fds;
+}
+
+
+/* Figure out if any timeouts have expired */
+static int virEventDispatchTimeouts(void) {
+    struct timeval tv;
+    unsigned long long now;
+    int i;
+
+    if (gettimeofday(&tv, NULL) < 0) {
+        return -1;
+    }
+    now = (((unsigned long long)tv.tv_sec)*1000) +
+        (((unsigned long long)tv.tv_usec)/1000);
+
+    for (i = 0 ; i < eventLoop.ntimeouts ; i++) {
+        if (eventLoop.timeouts[i].deleted)
+            continue;
+
+        if (eventLoop.timeouts[i].expiresAt <= now) {
+            (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer,
+                                       eventLoop.timeouts[i].opaque);
+            eventLoop.timeouts[i].expiresAt =
+                now + eventLoop.timeouts[i].timeout;
+        }
+    }
+    return 0;
+}
+
+
+/* Figure out if any handles have events pending */
+static int virEventDispatchHandles(struct pollfd *fds) {
+    int i;
+
+    for (i = 0 ; i < eventLoop.nhandles ; i++) {
+        if (eventLoop.handles[i].deleted) {
+            printf("Skip deleted %d\n", eventLoop.handles[i].fd);
+            continue;
+        }
+
+        if (fds[i].revents) {
+            printf("Dispatch %d %d %p\n", fds[i].fd, fds[i].revents, eventLoop.handles[i].opaque);
+            (eventLoop.handles[i].cb)(fds[i].fd, fds[i].revents,
+                                      eventLoop.handles[i].opaque);
+        }
+    }
+
+    return 0;
+}
+
+static int virEventCleanupTimeouts(void) {
+    int i;
+    for (i = 0 ; i < eventLoop.ntimeouts ; ) {
+        struct virEventTimeout *tmp;
+        if (!eventLoop.handles[i].deleted) {
+            i++;
+            continue;
+        }
+
+        if ((i+1) < eventLoop.ntimeouts) {
+            memmove(eventLoop.timeouts+i,
+                    eventLoop.timeouts+i+1,
+                    sizeof(struct virEventTimeout)*(eventLoop.ntimeouts-(i+1)));
+        }
+
+        tmp = realloc(eventLoop.timeouts, sizeof(struct virEventTimeout) * (eventLoop.ntimeouts-1));
+        if (!tmp) {
+            return -1;
+        }
+        eventLoop.timeouts = tmp;
+        eventLoop.ntimeouts--;
+    }
+    return 0;
+}
+
+static int virEventCleanupHandles(void) {
+    int i;
+
+    for (i = 0 ; i < eventLoop.nhandles ; ) {
+        struct virEventHandle *tmp;
+        if (!eventLoop.handles[i].deleted) {
+            i++;
+            continue;
+        }
+
+        if ((i+1) < eventLoop.nhandles) {
+            memmove(eventLoop.handles+i,
+                    eventLoop.handles+i+1,
+                    sizeof(struct virEventHandle)*(eventLoop.nhandles-(i+1)));
+        }
+
+        tmp = realloc(eventLoop.handles, sizeof(struct virEventHandle) * (eventLoop.nhandles-1));
+        if (!tmp) {
+            return -1;
+        }
+        eventLoop.handles = tmp;
+        eventLoop.nhandles--;
+    }
+    return 0;
+}
+
+int virEventRunOnce(void) {
+    struct pollfd *fds;
+    int ret, timeout;
+
+    if (!(fds = virEventMakePollFDs()))
+        return -1;
+
+    if (virEventCalculateTimeout(&timeout) < 0) {
+        free(fds);
+        return -1;
+    }
+
+ retry:
+    printf("Poll on %d handles %p timeout %d\n", eventLoop.nhandles, fds, timeout);
+    ret = poll(fds, eventLoop.nhandles, timeout);
+    printf("Poll got %d event\n", ret);
+    if (ret < 0) {
+        if (errno == EINTR) {
+            goto retry;
+        }
+        free(fds);
+        return -1;
+    }
+    if (virEventDispatchTimeouts() < 0) {
+        free(fds);
+        return -1;
+    }
+
+    if (ret > 0 &&
+        virEventDispatchHandles(fds) < 0) {
+        free(fds);
+        return -1;
+    }
+    free(fds);
+
+    if (virEventCleanupTimeouts() < 0)
+        return -1;
+
+    if (virEventCleanupHandles() < 0)
+        return -1;
+
+    return 0;
+}
+/*
+ * Local variables:
+ *  indent-tabs-mode: nil
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ *  tab-width: 4
+ * End:
+ */
diff -r 93de958458cb qemud/event.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qemud/event.h	Sun Jun 17 16:26:34 2007 -0400
@@ -0,0 +1,13 @@
+
+
+typedef void (*virEventHandleCallback)(int fd, int events, void *opaque);
+
+int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque);
+int virEventRemoveHandle(int fd);
+
+typedef void (*virEventTimeoutCallback)(int timer, void *opaque);
+
+int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque);
+int virEventRemoveTimeout(int timer);
+
+int virEventRunOnce(void);
diff -r 93de958458cb qemud/qemud.c
--- a/qemud/qemud.c	Fri Jun 15 14:27:39 2007 +0000
+++ b/qemud/qemud.c	Sun Jun 17 17:37:07 2007 -0400
@@ -61,6 +61,7 @@
 #include "driver.h"
 #include "conf.h"
 #include "iptables.h"
+#include "event.h"
 
 static int godaemon = 0;        /* -d: Be a daemon */
 static int verbose = 0;         /* -v: Verbose mode */
@@ -109,6 +110,13 @@ static void sig_handler(int sig) {
     }
     errno = origerrno;
 }
+
+static void qemudDispatchVMEvent(int fd, int events, void *opaque);
+static void qemudDispatchClientEvent(int fd, int events, void *opaque);
+static void qemudDispatchServerEvent(int fd, int events, void *opaque);
+static int qemudRegisterClientEvent(struct qemud_server *server,
+                                    struct qemud_client *client,
+                                    int remove);
 
 static int
 remoteInitializeGnuTLS (void)
@@ -184,8 +192,10 @@ remoteInitializeGnuTLS (void)
     return 0;
 }
 
-static int qemudDispatchSignal(struct qemud_server *server)
-{
+static void qemudDispatchSignalEvent(int fd ATTRIBUTE_UNUSED,
+                                     int events ATTRIBUTE_UNUSED,
+                                     void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
     unsigned char sigc;
     struct qemud_vm *vm;
     struct qemud_network *network;
@@ -194,7 +204,7 @@ static int qemudDispatchSignal(struct qe
     if (read(server->sigread, &sigc, 1) != 1) {
         qemudLog(QEMUD_ERR, "Failed to read from signal pipe: %s",
                  strerror(errno));
-        return -1;
+        return;
     }
 
     ret = 0;
@@ -266,7 +276,8 @@ static int qemudDispatchSignal(struct qe
         break;
     }
 
-    return ret;
+    if (ret != 0)
+        server->shutdown = 1;
 }
 
 static int qemudSetCloseExec(int fd) {
@@ -474,19 +485,16 @@ static int qemudListenUnix(struct qemud_
     }
 
     sock->readonly = readonly;
-    sock->next = server->sockets;
-    server->sockets = sock;
-    server->nsockets++;
 
     if ((sock->fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
         qemudLog(QEMUD_ERR, "Failed to create socket: %s",
                  strerror(errno));
-        return -1;
+        goto cleanup;
     }
 
     if (qemudSetCloseExec(sock->fd) < 0 ||
         qemudSetNonBlock(sock->fd) < 0)
-        return -1;
+        goto cleanup;
 
     memset(&addr, 0, sizeof(addr));
     addr.sun_family = AF_UNIX;
@@ -502,17 +510,35 @@ static int qemudListenUnix(struct qemud_
     if (bind(sock->fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
         qemudLog(QEMUD_ERR, "Failed to bind socket to '%s': %s",
                  path, strerror(errno));
-        return -1;
+        goto cleanup;
     }
     umask(oldmask);
 
     if (listen(sock->fd, 30) < 0) {
         qemudLog(QEMUD_ERR, "Failed to listen for connections on '%s': %s",
                  path, strerror(errno));
-        return -1;
-    }
+        goto cleanup;
+    }
+
+    if (virEventAddHandle(sock->fd,
+                          POLLIN| POLLERR | POLLHUP,
+                          qemudDispatchServerEvent,
+                          server) < 0) {
+        qemudLog(QEMUD_ERR, "Failed to add server event callback");
+        goto cleanup;
+    }
+
+    sock->next = server->sockets;
+    server->sockets = sock;
+    server->nsockets++;
 
     return 0;
+
+ cleanup:
+    if (sock->fd)
+        close(sock->fd);
+    free(sock);
+    return -1;
 }
 
 // See: http://people.redhat.com/drepper/userapi-ipv6.html
@@ -606,6 +632,15 @@ remoteListenTCP (struct qemud_server *se
                       "remoteListenTCP: listen: %s", strerror (errno));
             return -1;
         }
+
+        if (virEventAddHandle(sock->fd,
+                              POLLIN| POLLERR | POLLHUP,
+                              qemudDispatchServerEvent,
+                              server) < 0) {
+            qemudLog(QEMUD_ERR, "Failed to add server event callback");
+            return -1;
+        }
+
     }
 
     return 0;
@@ -1026,11 +1061,15 @@ static int qemudDispatchServer(struct qe
     if (!client->tls) {
         client->mode = QEMUD_MODE_RX_HEADER;
         client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN;
+
+        if (qemudRegisterClientEvent (server, client, 0) < 0)
+            goto cleanup;
     } else {
         int ret;
 
         client->session = remoteInitializeTLSSession ();
-        if (client->session == NULL) goto tls_failed;
+        if (client->session == NULL)
+            goto cleanup;
 
         gnutls_transport_set_ptr (client->session,
                                   (gnutls_transport_ptr_t) (long) fd);
@@ -1040,16 +1079,22 @@ static int qemudDispatchServer(struct qe
         if (ret == 0) {
             /* Unlikely, but ...  Next step is to check the certificate. */
             if (remoteCheckAccess (client) == -1)
-                goto tls_failed;
+                goto cleanup;
+
+            if (qemudRegisterClientEvent(server, client, 0) < 0)
+                goto cleanup;
         } else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
             /* Most likely. */
             client->mode = QEMUD_MODE_TLS_HANDSHAKE;
             client->bufferLength = -1;
             client->direction = gnutls_record_get_direction (client->session);
+
+            if (qemudRegisterClientEvent (server, client, 0) < 0)
+                goto cleanup;
         } else {
             qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
                       gnutls_strerror (ret));
-            goto tls_failed;
+            goto cleanup;
         }
     }
 
@@ -1059,7 +1104,7 @@ static int qemudDispatchServer(struct qe
 
     return 0;
 
- tls_failed:
+ cleanup:
     if (client->session) gnutls_deinit (client->session);
     close (fd);
     free (client);
@@ -1455,6 +1500,15 @@ int qemudStartVMDaemon(struct qemud_serv
         server->nactivevms++;
         server->nvmfds += 2;
 
+        virEventAddHandle(vm->stdout,
+                          POLLIN | POLLERR | POLLHUP,
+                          qemudDispatchVMEvent,
+                          server);
+        virEventAddHandle(vm->stderr,
+                          POLLIN | POLLERR | POLLHUP,
+                          qemudDispatchVMEvent,
+                          server);
+
         ret = 0;
 
         if (qemudWaitForMonitor(server, vm) < 0) {
@@ -1497,6 +1551,8 @@ static void qemudDispatchClientFailure(s
         tmp = tmp->next;
     }
 
+    virEventRemoveHandle(client->fd);
+
     if (client->tls && client->session) gnutls_deinit (client->session);
     close(client->fd);
     free(client);
@@ -1590,6 +1646,8 @@ static int qemudClientRead(struct qemud_
     } else {
         ret = gnutls_record_recv (client->session, data, len);
         client->direction = gnutls_record_get_direction (client->session);
+        if (qemudRegisterClientEvent (server, client, 1) < 0)
+            qemudDispatchClientFailure (server, client);
         if (ret <= 0) {
             if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
                              ret != GNUTLS_E_INTERRUPTED)) {
@@ -1655,6 +1713,11 @@ static void qemudDispatchClientRead(stru
 
         xdr_destroy (&x);
 
+        if (qemudRegisterClientEvent(server, client, 1) < 0) {
+            qemudDispatchClientFailure(server, client);
+            return;
+        }
+
         /* Fall through */
     }
 
@@ -1679,6 +1742,8 @@ static void qemudDispatchClientRead(stru
 
         if (remote && h.prog == REMOTE_PROGRAM) {
             remoteDispatchClientRequest (server, client);
+            if (qemudRegisterClientEvent(server, client, 1) < 0)
+                qemudDispatchClientFailure(server, client);
         } else if (!remote && h.prog == QEMUD_PROGRAM) {
             qemud_packet_client p;
 
@@ -1689,6 +1754,9 @@ static void qemudDispatchClientRead(stru
             }
 
             qemudDispatchClientRequest(server, client, &p);
+
+            if (qemudRegisterClientEvent(server, client, 1) < 0)
+                qemudDispatchClientFailure(server, client);
         } else {
             /* An internal error. */
             qemudDebug ("Not REMOTE_PROGRAM or QEMUD_PROGRAM");
@@ -1709,12 +1777,17 @@ static void qemudDispatchClientRead(stru
             /* Finished.  Next step is to check the certificate. */
             if (remoteCheckAccess (client) == -1)
                 qemudDispatchClientFailure (server, client);
+            if (qemudRegisterClientEvent (server, client, 1) < 0)
+                qemudDispatchClientFailure (server, client);
         } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
             qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
                       gnutls_strerror (ret));
             qemudDispatchClientFailure (server, client);
-        } else
+        } else {
             client->direction = gnutls_record_get_direction (client->session);
+            if (qemudRegisterClientEvent (server ,client, 1) < 0)
+                qemudDispatchClientFailure (server, client);
+        }
 
         break;
     }
@@ -1745,6 +1818,8 @@ static int qemudClientWrite(struct qemud
     } else {
         ret = gnutls_record_send (client->session, data, len);
         client->direction = gnutls_record_get_direction (client->session);
+        if (qemudRegisterClientEvent (server, client, 1) < 0)
+            qemudDispatchClientFailure (server, client);
         if (ret < 0) {
             if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
                 qemudLog (QEMUD_ERR, "gnutls_record_send: %s",
@@ -1772,6 +1847,9 @@ static void qemudDispatchClientWrite(str
             client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN;
             client->bufferOffset = 0;
             if (client->tls) client->direction = QEMUD_TLS_DIRECTION_READ;
+
+            if (qemudRegisterClientEvent (server, client, 1) < 0)
+                qemudDispatchClientFailure (server, client);
         }
         /* Still writing */
         break;
@@ -1786,12 +1864,18 @@ static void qemudDispatchClientWrite(str
             /* Finished.  Next step is to check the certificate. */
             if (remoteCheckAccess (client) == -1)
                 qemudDispatchClientFailure (server, client);
+
+            if (qemudRegisterClientEvent (server, client, 1))
+                qemudDispatchClientFailure (server, client);
         } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
             qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
                       gnutls_strerror (ret));
             qemudDispatchClientFailure (server, client);
-        } else
+        } else {
             client->direction = gnutls_record_get_direction (client->session);
+            if (qemudRegisterClientEvent (server, client, 1))
+                qemudDispatchClientFailure (server, client);
+        }
 
         break;
     }
@@ -1842,6 +1926,10 @@ int qemudShutdownVMDaemon(struct qemud_s
 
     qemudVMData(server, vm, vm->stdout);
     qemudVMData(server, vm, vm->stderr);
+
+    virEventRemoveHandle(vm->stdout);
+    virEventRemoveHandle(vm->stderr);
+
     if (close(vm->logfile) < 0)
         qemudLog(QEMUD_WARN, "Unable to close logfile %d: %s", errno, strerror(errno));
     close(vm->stdout);
@@ -2340,94 +2428,107 @@ int qemudShutdownNetworkDaemon(struct qe
 }
 
 
-static int qemudDispatchPoll(struct qemud_server *server, struct pollfd *fds) {
+static void qemudDispatchVMEvent(int fd, int events, void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
+    struct qemud_vm *vm = server->vms;
+
+    while (vm) {
+        if (qemudIsActiveVM(vm) &&
+            (vm->stdout == fd ||
+             vm->stderr == fd))
+            break;
+
+        vm = vm->next;
+    }
+
+    if (!vm)
+        return;
+
+    if (events == POLLIN &&
+        qemudDispatchVMLog(server, vm, fd) == 0)
+        return;
+
+    qemudDispatchVMFailure(server, vm, fd);
+}
+
+static void qemudDispatchClientEvent(int fd, int events, void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
+    struct qemud_client *client = server->clients;
+
+    while (client) {
+        if (client->fd == fd)
+            break;
+
+        client = client->next;
+    }
+
+    if (!client)
+        return;
+
+    if (events == POLLOUT)
+        qemudDispatchClientWrite(server, client);
+    else if (events == POLLIN)
+        qemudDispatchClientRead(server, client);
+    else
+        qemudDispatchClientFailure(server, client);
+}
+
+static int qemudRegisterClientEvent(struct qemud_server *server,
+                                    struct qemud_client *client,
+                                    int removeFirst) {
+    if (removeFirst)
+        if (virEventRemoveHandle(client->fd) < 0)
+            return -1;
+
+    if (client->tls) {
+        if (virEventAddHandle(client->fd,
+                              (client->direction ?
+                               POLLOUT : POLLIN) | POLLERR | POLLHUP,
+                              qemudDispatchClientEvent,
+                              server) < 0)
+            return -1;
+    } else {
+        if (virEventAddHandle(client->fd,
+                              (client->mode == QEMUD_MODE_TX_PACKET ?
+                               POLLOUT : POLLIN) | POLLERR | POLLHUP,
+                              qemudDispatchClientEvent,
+                              server) < 0)
+            return -1;
+    }
+
+    return 0;
+}
+
+static void qemudDispatchServerEvent(int fd, int events, void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
     struct qemud_socket *sock = server->sockets;
-    struct qemud_client *client = server->clients;
-    struct qemud_vm *vm;
-    struct qemud_network *network;
-    int ret = 0;
-    int fd = 0;
-
-    if (fds[fd++].revents && qemudDispatchSignal(server) < 0)
-        return -1;
-
-    if (server->shutdown)
-        return 0;
-
-    vm = server->vms;
+
+    printf("Server event %d %d %p\n", fd, events, opaque);
+
+    while (sock) {
+        if (sock->fd == fd)
+            break;
+
+        sock = sock->next;
+    }
+
+    printf("Server socket %p\n", sock);
+    if (!sock)
+        return;
+
+    if (events)
+        qemudDispatchServer(server, sock);
+}
+
+
+static void qemudCleanupInactive(struct qemud_server *server) {
+    struct qemud_vm *vm = server->vms;
+    struct qemud_network *network = server->networks;
+
+    /* Cleanup any VMs which shutdown & dont have an associated
+       config file */
     while (vm) {
         struct qemud_vm *next = vm->next;
-        int failed = 0,
-            stdoutfd = vm->stdout,
-            stderrfd = vm->stderr;
-
-        if (!qemudIsActiveVM(vm)) {
-            vm = next;
-            continue;
-        }
-
-        if (stdoutfd != -1) {
-            if (fds[fd].revents) {
-                if (fds[fd].revents == POLLIN) {
-                    if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0)
-                        failed = 1;
-                } else {
-                    if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0)
-                        failed = 1;
-                }
-            }
-            fd++;
-        }
-        if (stderrfd != -1) {
-            if (!failed) {
-                if (fds[fd].revents) {
-                    if (fds[fd].revents == POLLIN) {
-                        if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0)
-                            failed = 1;
-                    } else {
-                        if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0)
-                            failed = 1;
-                    }
-                }
-            }
-            fd++;
-        }
-        vm = next;
-        if (failed)
-            ret = -1; /* FIXME: the daemon shouldn't exit on failure here */
-    }
-    while (client) {
-        struct qemud_client *next = client->next;
-
-        assert (client->magic == QEMUD_CLIENT_MAGIC);
-
-        if (fds[fd].revents) {
-            qemudDebug("Poll data normal");
-            if (fds[fd].revents == POLLOUT)
-                qemudDispatchClientWrite(server, client);
-            else if (fds[fd].revents == POLLIN)
-                qemudDispatchClientRead(server, client);
-            else
-                qemudDispatchClientFailure(server, client);
-        }
-        fd++;
-        client = next;
-    }
-    while (sock) {
-        struct qemud_socket *next = sock->next;
-        /* FIXME: the daemon shouldn't exit on error here */
-        if (fds[fd].revents)
-            if (qemudDispatchServer(server, sock) < 0)
-                return -1;
-        fd++;
-        sock = next;
-    }
-
-    /* Cleanup any VMs which shutdown & dont have an associated
-       config file */
-    vm = server->vms;
-    while (vm) {
-        struct qemud_vm *next = vm->next;
 
         if (!qemudIsActiveVM(vm) && !vm->configFile[0])
             qemudRemoveInactiveVM(server, vm);
@@ -2436,7 +2537,6 @@ static int qemudDispatchPoll(struct qemu
     }
 
     /* Cleanup any networks too */
-    network = server->networks;
     while (network) {
         struct qemud_network *next = network->next;
 
@@ -2446,91 +2546,16 @@ static int qemudDispatchPoll(struct qemu
         network = next;
     }
 
-    return ret;
-}
-
-static void qemudPreparePoll(struct qemud_server *server, struct pollfd *fds) {
-    int  fd = 0;
-    struct qemud_socket *sock;
-    struct qemud_client *client;
-    struct qemud_vm *vm;
-
-    fds[fd].fd = server->sigread;
-    fds[fd].events = POLLIN;
-    fd++;
-
-    for (vm = server->vms ; vm ; vm = vm->next) {
-        if (!qemudIsActiveVM(vm))
-            continue;
-        if (vm->stdout != -1) {
-            fds[fd].fd = vm->stdout;
-            fds[fd].events = POLLIN | POLLERR | POLLHUP;
-            fd++;
-        }
-        if (vm->stderr != -1) {
-            fds[fd].fd = vm->stderr;
-            fds[fd].events = POLLIN | POLLERR | POLLHUP;
-            fd++;
-        }
-    }
-    for (client = server->clients ; client ; client = client->next) {
-        fds[fd].fd = client->fd;
-        if (!client->tls) {
-            /* Refuse to read more from client if tx is pending to
-               rate limit */
-            if (client->mode == QEMUD_MODE_TX_PACKET)
-                fds[fd].events = POLLOUT | POLLERR | POLLHUP;
-            else
-                fds[fd].events = POLLIN | POLLERR | POLLHUP;
-        } else {
-            qemudDebug ("direction = %s",
-                        client->direction ? "WRITE" : "READ");
-            fds[fd].events = client->direction ? POLLOUT : POLLIN;
-            fds[fd].events |= POLLERR | POLLHUP;
-        }
-        fd++;
-    }
-    for (sock = server->sockets ; sock ; sock = sock->next) {
-        fds[fd].fd = sock->fd;
-        fds[fd].events = POLLIN;
-        fd++;
-    }
+    return;
 }
 
 
 
 static int qemudOneLoop(struct qemud_server *server) {
-    int nfds = server->nsockets + server->nclients + server->nvmfds + 1; /* server->sigread */
-    struct pollfd fds[nfds];
-    int thistimeout = -1;
-    int ret;
     sig_atomic_t errors;
 
-    /* If we have no clients or vms, then timeout after
-       30 seconds, letting daemon exit */
-    if (timeout > 0 &&
-        !server->nclients &&
-        !server->nactivevms)
-        thistimeout = timeout;
-
-    qemudPreparePoll(server, fds);
-
- retry:
-
-    if ((ret = poll(fds, nfds, thistimeout * 1000)) < 0) {
-        if (errno == EINTR) {
-            goto retry;
-        }
-        qemudLog(QEMUD_ERR, "Error polling on file descriptors: %s",
-                 strerror(errno));
-        return -1;
-    }
-
-    /* Must have timed out */
-    if (ret == 0) {
-        qemudLog(QEMUD_INFO, "Timed out while polling on file descriptors");
-        return -1;
-    }
+    if (virEventRunOnce() < 0)
+        return -1;
 
     /* Check for any signal handling errors and log them. */
     errors = sig_errors;
@@ -2542,8 +2567,7 @@ static int qemudOneLoop(struct qemud_ser
         return -1;
     }
 
-    if (qemudDispatchPoll(server, fds) < 0)
-        return -1;
+    qemudCleanupInactive(server);
 
     return 0;
 }
@@ -2941,6 +2965,15 @@ int main(int argc, char **argv) {
         goto error2;
     }
 
+    if (virEventAddHandle(sigpipe[0],
+                          POLLIN,
+                          qemudDispatchSignalEvent,
+                          server) < 0) {
+        qemudLog(QEMUD_ERR, "Failed to register callback for signal pipe");
+        ret = 3;
+        goto error2;
+    }
+
     qemudRunLoop(server);
 
     qemudCleanup(server);
diff -r 93de958458cb qemud/remote.c
--- a/qemud/remote.c	Fri Jun 15 14:27:39 2007 +0000
+++ b/qemud/remote.c	Sun Jun 17 16:26:54 2007 -0400
@@ -691,7 +691,7 @@ remoteDispatchDomainGetInfo (struct qemu
         remoteDispatchError (client, req, "domain not found");
         return -2;
     }
-
+    printf("------------------------------------- %d %s\n", dom->id, dom->name);
     if (virDomainGetInfo (dom, &info) == -1)
         return -1;
 
@@ -862,7 +862,7 @@ remoteDispatchDomainLookupById (struct q
 
     dom = virDomainLookupByID (client->conn, args->id);
     if (dom == NULL) return -1;
-
+    printf("******************Loopu %d %s\n", dom->id, dom->name);
     make_nonnull_domain (&ret->dom, dom);
 
     return 0;
@@ -1539,6 +1539,7 @@ get_nonnull_domain (virConnectPtr conn, 
     /* Should we believe the domain.id sent by the client?  Maybe
      * this should be a check rather than an assignment? XXX
      */
+    printf("********************* %d\n", domain.id);
     if (dom) dom->id = domain.id;
     return dom;
 }

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]