[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [libvirt] PATCH: 8/25: Concurrent dispatch of RPC methods



This patch re-writes the code for dispatching RPC calls in the
remote driver to allow use from multiple threads. Only one thread
is allowed to send/recv on the socket at a time though. If another
thread comes along it will put itself on a queue and go to sleep.
The first thread may actually get around to transmitting the 2nd
thread's request while it is waiting for its own reply. It may
even get the 2nd threads reply, if its own RPC call is being really
slow. So when a thread wakes up from sleeping, it has to check
whether its own RPC call has already been processed. Likewise when
a thread owning the socket finishes with its own wor, it may have
to pass the buck to another thread. The upshot of this, is that
we have mutliple RPC calls executing in parallel, and requests+reply
are no longer guarenteed to be FIFO on the wire if talking to a new
enough server.

This refactoring required use of a self-pipe/poll trick for sync
between threads, but fortunately gnulib now provides this on Windows
too, so there's no compatability problem there.

 libvirt_private.syms |    1 
 remote_internal.c    | 1527 ++++++++++++++++++++++++++++++++-------------------
 util.c               |   33 -
 util.h               |    2 
 4 files changed, 990 insertions(+), 573 deletions(-)

Daniel

diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -288,6 +288,7 @@ virEventAddHandle;
 virEventAddHandle;
 virEventRemoveHandle;
 virExec;
+virSetNonBlock;
 virFormatMacAddr;
 virGetHostname;
 virParseMacAddr;
diff --git a/src/remote_internal.c b/src/remote_internal.c
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -67,6 +67,8 @@
 #include <libxml/uri.h>
 
 #include <netdb.h>
+
+#include <poll.h>
 
 /* AI_ADDRCONFIG is missing on some systems. */
 #ifndef AI_ADDRCONFIG
@@ -86,7 +88,43 @@
 #include "util.h"
 #include "event.h"
 
+#ifdef WIN32
+#define pipe(fds) _pipe(fds,4096, _O_BINARY)
+#endif
+
+
 static int inside_daemon = 0;
+
+struct remote_thread_call;
+
+
+enum {
+    REMOTE_MODE_WAIT_TX,
+    REMOTE_MODE_WAIT_RX,
+    REMOTE_MODE_COMPLETE,
+    REMOTE_MODE_ERROR,
+};
+
+struct remote_thread_call {
+    int mode;
+
+    /* 4 byte length, followed by RPC message header+body */
+    char buffer[4 + REMOTE_MESSAGE_MAX];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
+    unsigned int serial;
+    unsigned int proc_nr;
+
+    virCond cond;
+
+    xdrproc_t ret_filter;
+    char *ret;
+
+    remote_error err;
+
+    struct remote_thread_call *next;
+};
 
 struct private_data {
     virMutex lock;
@@ -101,12 +139,24 @@ struct private_data {
     int localUses;              /* Ref count for private data */
     char *hostname;             /* Original hostname */
     FILE *debugLog;             /* Debug remote protocol */
+
 #if HAVE_SASL
     sasl_conn_t *saslconn;      /* SASL context */
+
     const char *saslDecoded;
     unsigned int saslDecodedLength;
     unsigned int saslDecodedOffset;
-#endif
+
+    const char *saslEncoded;
+    unsigned int saslEncodedLength;
+    unsigned int saslEncodedOffset;
+#endif
+
+    /* 4 byte length, followed by RPC message header+body */
+    char buffer[4 + REMOTE_MESSAGE_MAX];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
     /* The list of domain event callbacks */
     virDomainEventCallbackListPtr callbackList;
     /* The queue of domain events generated
@@ -114,6 +164,11 @@ struct private_data {
     virDomainEventQueuePtr domainEvents;
     /* Timer for flushing domainEvents queue */
     int eventFlushTimer;
+
+    /* List of threads currently doing dispatch */
+    int wakeupSend;
+    int wakeupRead;
+    struct remote_thread_call *waitDispatch;
 };
 
 enum {
@@ -160,7 +215,6 @@ static void make_nonnull_storage_pool (r
 static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
 static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
 void remoteDomainEventFired(int watch, int fd, int event, void *data);
-static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
 static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
 void remoteDomainEventQueueFlush(int timer, void *opaque);
 /*----------------------------------------------------------------------*/
@@ -274,6 +328,7 @@ doRemoteOpen (virConnectPtr conn,
               virConnectAuthPtr auth ATTRIBUTE_UNUSED,
               int flags)
 {
+    int wakeup[2];
     char *transport_str = NULL;
 
     if (conn->uri) {
@@ -696,6 +751,21 @@ doRemoteOpen (virConnectPtr conn,
 
     } /* switch (transport) */
 
+    if (virSetNonBlock(priv->sock) < 0) {
+        errorf (conn, VIR_ERR_SYSTEM_ERROR,
+                _("unable to make socket non-blocking %s"),
+                strerror(errno));
+        goto failed;
+    }
+
+    if (pipe(wakeup) < 0) {
+        errorf (conn, VIR_ERR_SYSTEM_ERROR,
+                _("unable to make pipe %s"),
+                strerror(errno));
+        goto failed;
+    }
+    priv->wakeupRead = wakeup[0];
+    priv->wakeupSend = wakeup[1];
 
     /* Try and authenticate with server */
     if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
@@ -768,6 +838,7 @@ doRemoteOpen (virConnectPtr conn,
             DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
                     "continuing without events.");
             virEventRemoveHandle(priv->watch);
+            priv->watch = -1;
         }
     }
     /* Successful. */
@@ -848,6 +919,7 @@ remoteOpen (virConnectPtr conn,
     }
     remoteDriverLock(priv);
     priv->localUses = 1;
+    priv->watch = -1;
 
     if (flags & VIR_CONNECT_RO)
         rflags |= VIR_DRV_OPEN_REMOTE_RO;
@@ -1220,6 +1292,7 @@ doRemoteClose (virConnectPtr conn, struc
         virEventRemoveTimeout(priv->eventFlushTimer);
         /* Remove handle for remote events */
         virEventRemoveHandle(priv->watch);
+        priv->watch = -1;
     }
 
     /* Close socket. */
@@ -5542,12 +5615,658 @@ done:
 
 /*----------------------------------------------------------------------*/
 
-static int really_write (virConnectPtr conn, struct private_data *priv,
-                         int in_open, char *bytes, int len);
-static int really_read (virConnectPtr conn, struct private_data *priv,
-                        int in_open, char *bytes, int len);
-
-/* This function performs a remote procedure call to procedure PROC_NR.
+
+static struct remote_thread_call *
+prepareCall(virConnectPtr conn,
+            struct private_data *priv,
+            int flags,
+            int proc_nr,
+            xdrproc_t args_filter, char *args,
+            xdrproc_t ret_filter, char *ret)
+{
+    XDR xdr;
+    struct remote_message_header hdr;
+    struct remote_thread_call *rv;
+
+    if (VIR_ALLOC(rv) < 0)
+        return NULL;
+
+    if (virCondInit(&rv->cond) < 0) {
+        VIR_FREE(rv);
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_INTERNAL_ERROR,
+               _("cannot initialize mutex"));
+        return NULL;
+    }
+
+    /* Get a unique serial number for this message. */
+    rv->serial = priv->counter++;
+    rv->proc_nr = proc_nr;
+    rv->ret_filter = ret_filter;
+    rv->ret = ret;
+
+    hdr.prog = REMOTE_PROGRAM;
+    hdr.vers = REMOTE_PROTOCOL_VERSION;
+    hdr.proc = proc_nr;
+    hdr.direction = REMOTE_CALL;
+    hdr.serial = rv->serial;
+    hdr.status = REMOTE_OK;
+
+    /* Serialise header followed by args. */
+    xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
+        goto error;
+    }
+
+    if (!(*args_filter) (&xdr, args)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+               _("marshalling args"));
+        goto error;
+    }
+
+    /* Get the length stored in buffer. */
+    rv->bufferLength = xdr_getpos (&xdr);
+    xdr_destroy (&xdr);
+
+    /* Length must include the length word itself (always encoded in
+     * 4 bytes as per RFC 4506).
+     */
+    rv->bufferLength += 4;
+
+    /* Encode the length word. */
+    xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
+    if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+               _("xdr_int (length word)"));
+        goto error;
+    }
+    xdr_destroy (&xdr);
+
+    return rv;
+
+error:
+    VIR_FREE(ret);
+    return NULL;
+}
+
+
+
+static int
+processCallWrite(virConnectPtr conn,
+                 struct private_data *priv,
+                 int in_open /* if we are in virConnectOpen */,
+                 const char *bytes, int len)
+{
+    int ret;
+
+    if (priv->uses_tls) {
+    tls_resend:
+        ret = gnutls_record_send (priv->session, bytes, len);
+        if (ret < 0) {
+            if (ret == GNUTLS_E_INTERRUPTED)
+                goto tls_resend;
+            if (ret == GNUTLS_E_AGAIN)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret));
+            return -1;
+        }
+    } else {
+    resend:
+        ret = send (priv->sock, bytes, len, 0);
+        if (ret == -1) {
+            if (errno == EINTR)
+                goto resend;
+            if (errno == EWOULDBLOCK)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_SYSTEM_ERROR, strerror (errno));
+            return -1;
+
+        }
+    }
+
+    return ret;
+}
+
+
+static int
+processCallRead(virConnectPtr conn,
+                struct private_data *priv,
+                int in_open /* if we are in virConnectOpen */,
+                char *bytes, int len)
+{
+    int ret;
+
+    if (priv->uses_tls) {
+    tls_resend:
+        ret = gnutls_record_recv (priv->session, bytes, len);
+        if (ret == GNUTLS_E_INTERRUPTED)
+            goto tls_resend;
+        if (ret == GNUTLS_E_AGAIN)
+            return 0;
+
+        /* Treat 0 == EOF as an error */
+        if (ret <= 0) {
+            if (ret < 0)
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_GNUTLS_ERROR,
+                        _("failed to read from TLS socket %s"),
+                        gnutls_strerror (ret));
+            else
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_SYSTEM_ERROR,
+                        "%s", _("server closed connection"));
+            return -1;
+        }
+    } else {
+    resend:
+        ret = recv (priv->sock, bytes, len, 0);
+        if (ret <= 0) {
+            if (ret == -1) {
+                if (errno == EINTR)
+                    goto resend;
+                if (errno == EWOULDBLOCK)
+                    return 0;
+
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_SYSTEM_ERROR,
+                        _("failed to read from socket %s"),
+                        strerror (errno));
+            } else {
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_SYSTEM_ERROR,
+                        "%s", _("server closed connection"));
+            }
+            return -1;
+        }
+    }
+
+    return ret;
+}
+
+
+static int
+processCallSendOne(virConnectPtr conn,
+                   struct private_data *priv,
+                   int in_open,
+                   struct remote_thread_call *thecall)
+{
+#if HAVE_SASL
+    if (priv->saslconn) {
+        const char *output;
+        unsigned int outputlen;
+        int err, ret;
+
+        if (!priv->saslEncoded) {
+            err = sasl_encode(priv->saslconn,
+                              thecall->buffer + thecall->bufferOffset,
+                              thecall->bufferLength - thecall->bufferOffset,
+                              &output, &outputlen);
+            if (err != SASL_OK) {
+                return -1;
+            }
+            priv->saslEncoded = output;
+            priv->saslEncodedLength = outputlen;
+            priv->saslEncodedOffset = 0;
+
+            thecall->bufferOffset = thecall->bufferLength;
+        }
+
+        ret = processCallWrite(conn, priv, in_open,
+                               priv->saslEncoded + priv->saslEncodedOffset,
+                               priv->saslEncodedLength - priv->saslEncodedOffset);
+        if (ret < 0)
+            return ret;
+        priv->saslEncodedOffset += ret;
+
+        if (priv->saslEncodedOffset == priv->saslEncodedLength) {
+            priv->saslEncoded = NULL;
+            priv->saslEncodedOffset = priv->saslEncodedLength = 0;
+            thecall->mode = REMOTE_MODE_WAIT_RX;
+        }
+    } else {
+#endif
+        int ret;
+        ret = processCallWrite(conn, priv, in_open,
+                               thecall->buffer + thecall->bufferOffset,
+                               thecall->bufferLength - thecall->bufferOffset);
+        if (ret < 0)
+            return ret;
+        thecall->bufferOffset += ret;
+
+        if (thecall->bufferOffset == thecall->bufferLength) {
+            thecall->bufferOffset = thecall->bufferLength = 0;
+            thecall->mode = REMOTE_MODE_WAIT_RX;
+        }
+#if HAVE_SASL
+    }
+#endif
+    return 0;
+}
+
+
+static int
+processCallSend(virConnectPtr conn, struct private_data *priv,
+                int in_open) {
+    struct remote_thread_call *thecall = priv->waitDispatch;
+
+    while (thecall &&
+           thecall->mode != REMOTE_MODE_WAIT_TX)
+        thecall = thecall->next;
+
+    if (!thecall)
+        return -1; /* Shouldn't happen, but you never know... */
+
+    while (thecall) {
+        int ret = processCallSendOne(conn, priv, in_open, thecall);
+        if (ret < 0)
+            return ret;
+
+        if (thecall->mode == REMOTE_MODE_WAIT_TX)
+            return 0; /* Blocking write, to back to event loop */
+
+        thecall = thecall->next;
+    }
+
+    return 0; /* No more calls to send, all done */
+}
+
+static int
+processCallRecvSome(virConnectPtr conn, struct private_data *priv,
+                    int in_open) {
+    unsigned int wantData;
+
+    /* Start by reading length word */
+    if (priv->bufferLength == 0)
+        priv->bufferLength = 4;
+
+    wantData = priv->bufferLength - priv->bufferOffset;
+
+#if HAVE_SASL
+    if (priv->saslconn) {
+        if (priv->saslDecoded == NULL) {
+            char encoded[8192];
+            unsigned int encodedLen = sizeof(encoded);
+            int ret, err;
+            ret = processCallRead(conn, priv, in_open,
+                                  encoded, encodedLen);
+            if (ret < 0)
+                return -1;
+            if (ret == 0)
+                return 0;
+
+            err = sasl_decode(priv->saslconn, encoded, ret,
+                              &priv->saslDecoded, &priv->saslDecodedLength);
+            if (ret != SASL_OK)
+                return -1;
+            priv->saslDecodedOffset = 0;
+        }
+
+        if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData)
+            wantData = (priv->saslDecodedLength - priv->saslDecodedOffset);
+
+        memcpy(priv->buffer + priv->bufferOffset,
+               priv->saslDecoded + priv->saslDecodedOffset,
+               wantData);
+        priv->saslDecodedOffset += wantData;
+        priv->bufferOffset += wantData;
+        if (priv->saslDecodedOffset == priv->saslDecodedLength) {
+            priv->saslDecodedLength = priv->saslDecodedLength = 0;
+            priv->saslDecoded = NULL;
+        }
+
+        return wantData;
+    } else {
+#endif
+        int ret;
+
+        ret = processCallRead(conn, priv, in_open,
+                              priv->buffer + priv->bufferOffset,
+                              wantData);
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;
+
+        priv->bufferOffset += ret;
+
+        return ret;
+#if HAVE_SASL
+    }
+#endif
+}
+
+
+static void
+processCallAsyncEvent(virConnectPtr conn, struct private_data *priv,
+                      int in_open,
+                      remote_message_header *hdr,
+                      XDR *xdr) {
+    /* An async message has come in while we were waiting for the
+     * response. Process it to pull it off the wire, and try again
+     */
+    DEBUG0("Encountered an event while waiting for a response");
+
+    if (in_open) {
+        DEBUG("Ignoring bogus event %d received while in open", hdr->proc);
+        return;
+    }
+
+    if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) {
+        remoteDomainQueueEvent(conn, xdr);
+        virEventUpdateTimeout(priv->eventFlushTimer, 0);
+    } else {
+        DEBUG("Unexpected event proc %d", hdr->proc);
+    }
+}
+
+static int
+processCallRecvLen(virConnectPtr conn, struct private_data *priv,
+                   int in_open) {
+    XDR xdr;
+    int len;
+
+    xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
+    if (!xdr_int (&xdr, &len)) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+        return -1;
+    }
+    xdr_destroy (&xdr);
+
+    /* Length includes length word - adjust to real length to read. */
+    len -= 4;
+
+    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("packet received from server too large"));
+        return -1;
+    }
+
+    /* Extend our declared buffer length and carry
+       on reading the header + payload */
+    priv->bufferLength += len;
+    DEBUG("Got length, now need %d total (%d more)", priv->bufferLength, len);
+    return 0;
+}
+
+
+static int
+processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
+                   int in_open) {
+    XDR xdr;
+    struct remote_message_header hdr;
+    int len = priv->bufferLength - 4;
+    struct remote_thread_call *thecall;
+
+    /* Deserialise reply header. */
+    xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("invalid header in reply"));
+        return -1;
+    }
+
+    /* Check program, version, etc. are what we expect. */
+    if (hdr.prog != REMOTE_PROGRAM) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown program (received %x, expected %x)"),
+                       hdr.prog, REMOTE_PROGRAM);
+        return -1;
+    }
+    if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown protocol version (received %x, expected %x)"),
+                       hdr.vers, REMOTE_PROTOCOL_VERSION);
+        return -1;
+    }
+
+    /* Async events from server need special handling */
+    if (hdr.direction == REMOTE_MESSAGE) {
+        processCallAsyncEvent(conn, priv, in_open,
+                              &hdr, &xdr);
+        xdr_destroy(&xdr);
+        return 0;
+    }
+
+    if (hdr.direction != REMOTE_REPLY) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("got unexpected RPC call %d from server"),
+                       hdr.proc);
+        xdr_destroy(&xdr);
+        return -1;
+    }
+
+    /* Ok, definitely got an RPC reply now find
+       out who's been waiting for it */
+
+    thecall = priv->waitDispatch;
+    while (thecall &&
+           thecall->serial != hdr.serial)
+        thecall = thecall->next;
+
+    if (!thecall) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("no call waiting for reply with serial %d"),
+                       hdr.serial);
+        xdr_destroy(&xdr);
+        return -1;
+    }
+
+    if (hdr.proc != thecall->proc_nr) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown procedure (received %x, expected %x)"),
+                       hdr.proc, thecall->proc_nr);
+        xdr_destroy (&xdr);
+        return -1;
+    }
+
+    /* Status is either REMOTE_OK (meaning that what follows is a ret
+     * structure), or REMOTE_ERROR (and what follows is a remote_error
+     * structure).
+     */
+    switch (hdr.status) {
+    case REMOTE_OK:
+        if (!(*thecall->ret_filter) (&xdr, thecall->ret)) {
+            error (in_open ? NULL : conn, VIR_ERR_RPC,
+                   _("unmarshalling ret"));
+            return -1;
+        }
+        thecall->mode = REMOTE_MODE_COMPLETE;
+        xdr_destroy (&xdr);
+        return 0;
+
+    case REMOTE_ERROR:
+        memset (&thecall->err, 0, sizeof thecall->err);
+        if (!xdr_remote_error (&xdr, &thecall->err)) {
+            error (in_open ? NULL : conn,
+                   VIR_ERR_RPC, _("unmarshalling remote_error"));
+            return -1;
+        }
+        xdr_destroy (&xdr);
+        thecall->mode = REMOTE_MODE_ERROR;
+        return 0;
+
+    default:
+        virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown status (received %x)"),
+                       hdr.status);
+        xdr_destroy (&xdr);
+        return -1;
+    }
+}
+
+
+static int
+processCallRecv(virConnectPtr conn, struct private_data *priv,
+                int in_open) {
+    int ret;
+
+    /* Read as much data as is available, until we get
+     * EGAIN
+     */
+    for (;;) {
+        DEBUG("Do %d %d", priv->bufferLength, priv->bufferOffset);
+        ret = processCallRecvSome(conn, priv, in_open);
+        DEBUG("Got %d\n", ret);
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;  /* Blocking on read */
+
+        /* Check for completion of our goal */
+        if (priv->bufferOffset == priv->bufferLength) {
+            if (priv->bufferOffset == 4) {
+                ret = processCallRecvLen(conn, priv, in_open);
+            } else {
+                ret = processCallRecvMsg(conn, priv, in_open);
+                priv->bufferOffset = priv->bufferLength = 0;
+            }
+            if (ret < 0)
+                return -1;
+        }
+    }
+}
+
+/*
+ * Process all calls pending dispatch/receive until we
+ * get a reply to our own call. Then quit and pass the buck
+ * to someone else.
+ */
+static int
+processCalls(virConnectPtr conn,
+             struct private_data *priv,
+             int in_open,
+             struct remote_thread_call *thiscall)
+{
+    struct pollfd fds[2];
+    int ret;
+
+    /* XXX weeeeeeeendows hate, perhaps gnulib poll() will work ? */
+
+    fds[0].fd = priv->sock;
+    fds[1].fd = priv->wakeupRead;
+
+    for (;;) {
+        struct remote_thread_call *tmp = priv->waitDispatch;
+        struct remote_thread_call *prev;
+        char ignore;
+
+        fds[0].events = fds[0].revents = 0;
+        fds[1].events = fds[1].revents = 0;
+
+        fds[1].events = POLLIN;
+        while (tmp) {
+            if (tmp->mode == REMOTE_MODE_WAIT_RX)
+                fds[0].events |= POLLIN;
+            if (tmp->mode == REMOTE_MODE_WAIT_TX)
+                fds[0].events |= POLLOUT;
+
+            tmp = tmp->next;
+        }
+
+        /* Release lock while poll'ing so other threads
+         * can stuff themselves on the queue */
+        remoteDriverUnlock(priv);
+
+    repoll:
+        ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+        if (ret < 0 && errno == EINTR)
+            goto repoll;
+        remoteDriverLock(priv);
+
+        if (fds[1].revents) {
+            DEBUG0("Woken up from poll by other thread");
+            saferead(priv->wakeupRead, &ignore, sizeof(ignore));
+        }
+
+        if (ret < 0) {
+            if (errno == EWOULDBLOCK)
+                continue;
+            DEBUG("Poll unexpectedly failed %d\n", errno);
+            return -1;
+        }
+
+        if (fds[0].revents & POLLOUT) {
+            if (processCallSend(conn, priv, in_open) < 0)
+                return -1;
+        }
+
+        if (fds[0].revents & POLLIN) {
+            if (processCallRecv(conn, priv, in_open) < 0)
+                return -1;
+        }
+
+        /* Iterate through waiting threads and if
+         * any are complete then tell 'em to wakeup
+         */
+        tmp = priv->waitDispatch;
+        prev = NULL;
+        while (tmp) {
+            if (tmp != thiscall &&
+                (tmp->mode == REMOTE_MODE_COMPLETE ||
+                 tmp->mode == REMOTE_MODE_ERROR)) {
+                /* Take them out of the list */
+                if (prev)
+                    prev->next = tmp->next;
+                else
+                    priv->waitDispatch = tmp->next;
+
+                /* And wake them up....
+                 * ...they won't actually wakeup until
+                 * we release our mutex a short while
+                 * later...
+                 */
+                DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp, priv->waitDispatch);
+                virCondSignal(&tmp->cond);
+            }
+            prev = tmp;
+            tmp = tmp->next;
+        }
+
+        /* Now see if *we* are done */
+        if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+            thiscall->mode == REMOTE_MODE_ERROR) {
+            /* We're at head of the list already, so
+             * remove us
+             */
+            priv->waitDispatch = thiscall->next;
+            DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr, thiscall, priv->waitDispatch);
+            /* See if someone else is still waiting
+             * and if so, then pass the buck ! */
+            if (priv->waitDispatch) {
+                DEBUG("Passing the buck to %d %p", priv->waitDispatch->proc_nr, priv->waitDispatch);
+                virCondSignal(&priv->waitDispatch->cond);
+            }
+            return 0;
+        }
+
+
+        if (fds[0].revents & (POLLHUP | POLLERR)) {
+            DEBUG0("Got poll hangup/error");
+            return -1;
+        }
+    }
+}
+
+/*
+ * This function performs a remote procedure call to procedure PROC_NR.
  *
  * NB. This does not free the args structure (not desirable, since you
  * often want this allocated on the stack or else it contains strings
@@ -5556,204 +6275,29 @@ static int really_read (virConnectPtr co
  *
  * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
  * else Bad Things will happen in the XDR code.
- */
-static int
-doCall (virConnectPtr conn, struct private_data *priv,
-        int flags /* if we are in virConnectOpen */,
-        int proc_nr,
-        xdrproc_t args_filter, char *args,
-        xdrproc_t ret_filter, char *ret)
-{
-    char buffer[REMOTE_MESSAGE_MAX];
-    char buffer2[4];
-    struct remote_message_header hdr;
-    XDR xdr;
-    int len;
-    struct remote_error rerror;
-
-    /* Get a unique serial number for this message. */
-    int serial = priv->counter++;
-
-    hdr.prog = REMOTE_PROGRAM;
-    hdr.vers = REMOTE_PROTOCOL_VERSION;
-    hdr.proc = proc_nr;
-    hdr.direction = REMOTE_CALL;
-    hdr.serial = serial;
-    hdr.status = REMOTE_OK;
-
-    /* Serialise header followed by args. */
-    xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
-        return -1;
-    }
-
-    if (!(*args_filter) (&xdr, args)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("marshalling args"));
-        return -1;
-    }
-
-    /* Get the length stored in buffer. */
-    len = xdr_getpos (&xdr);
-    xdr_destroy (&xdr);
-
-    /* Length must include the length word itself (always encoded in
-     * 4 bytes as per RFC 4506).
-     */
-    len += 4;
-
-    /* Encode the length word. */
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("xdr_int (length word)"));
-        return -1;
-    }
-    xdr_destroy (&xdr);
-
-    /* Send length word followed by header+args. */
-    if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1 ||
-        really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
-        return -1;
-
-retry_read:
-    /* Read and deserialise length word. */
-    if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
-        return -1;
-
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        return -1;
-    }
-    xdr_destroy (&xdr);
-
-    /* Length includes length word - adjust to real length to read. */
-    len -= 4;
-
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("packet received from server too large"));
-        return -1;
-    }
-
-    /* Read reply header and what follows (either a ret or an error). */
-    if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1)
-        return -1;
-
-    /* Deserialise reply header. */
-    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("invalid header in reply"));
-        return -1;
-    }
-
-    /* Check program, version, etc. are what we expect. */
-    if (hdr.prog != REMOTE_PROGRAM) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown program (received %x, expected %x)"),
-                         hdr.prog, REMOTE_PROGRAM);
-        return -1;
-    }
-    if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown protocol version (received %x, expected %x)"),
-                         hdr.vers, REMOTE_PROTOCOL_VERSION);
-        return -1;
-    }
-
-    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
-        hdr.direction == REMOTE_MESSAGE) {
-        /* An async message has come in while we were waiting for the
-         * response. Process it to pull it off the wire, and try again
-         */
-        DEBUG0("Encountered an event while waiting for a response");
-
-        remoteDomainQueueEvent(conn, &xdr);
-        virEventUpdateTimeout(priv->eventFlushTimer, 0);
-
-        DEBUG0("Retrying read");
-        xdr_destroy (&xdr);
-        goto retry_read;
-    }
-    if (hdr.proc != proc_nr) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown procedure (received %x, expected %x)"),
-                         hdr.proc, proc_nr);
-        return -1;
-    }
-    if (hdr.direction != REMOTE_REPLY) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown direction (received %x, expected %x)"),
-                         hdr.direction, REMOTE_REPLY);
-        return -1;
-    }
-    if (hdr.serial != serial) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown serial (received %x, expected %x)"),
-                         hdr.serial, serial);
-        return -1;
-    }
-
-    /* Status is either REMOTE_OK (meaning that what follows is a ret
-     * structure), or REMOTE_ERROR (and what follows is a remote_error
-     * structure).
-     */
-    switch (hdr.status) {
-    case REMOTE_OK:
-        if (!(*ret_filter) (&xdr, ret)) {
-            error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-                   _("unmarshalling ret"));
-            return -1;
-        }
-        xdr_destroy (&xdr);
-        return 0;
-
-    case REMOTE_ERROR:
-        memset (&rerror, 0, sizeof rerror);
-        if (!xdr_remote_error (&xdr, &rerror)) {
-            error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                   VIR_ERR_RPC, _("unmarshalling remote_error"));
-            return -1;
-        }
-        xdr_destroy (&xdr);
-        /* See if caller asked us to keep quiet about missing RPCs
-         * eg for interop with older servers */
-        if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
-            rerror.domain == VIR_FROM_REMOTE &&
-            rerror.code == VIR_ERR_RPC &&
-            rerror.level == VIR_ERR_ERROR &&
-            STRPREFIX(*rerror.message, "unknown procedure")) {
-            return -2;
-        }
-        server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror);
-        xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror);
-        return -1;
-
-    default:
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown status (received %x)"),
-                         hdr.status);
-        xdr_destroy (&xdr);
-        return -1;
-    }
-}
-
-
+ *
+ * NB(3) You must have the private_data lock before calling this
+ *
+ * NB(4) This is very complicated. Due to connection cloning, multiple
+ * threads can want to use the socket at once. Obviously only one of
+ * them can. So if someone's using the socket, other threads are put
+ * to sleep on condition variables. THe existing thread may completely
+ * send & receive their RPC call/reply while they're asleep. Or it
+ * may only get around to dealing with sending the call. Or it may
+ * get around to neither. So upon waking up from slumber, the other
+ * thread may or may not have more work todo.
+ *
+ * We call this dance  'passing the buck'
+ *
+ *      http://en.wikipedia.org/wiki/Passing_the_buck
+ *
+ *   "Buck passing or passing the buck is the action of transferring
+ *    responsibility or blame unto another person. It is also used as
+ *    a strategy in power politics when the actions of one country/
+ *    nation are blamed on another, providing an opportunity for war."
+ *
+ * NB(5) Don't Panic!
+ */
 static int
 call (virConnectPtr conn, struct private_data *priv,
       int flags /* if we are in virConnectOpen */,
@@ -5762,6 +6306,84 @@ call (virConnectPtr conn, struct private
       xdrproc_t ret_filter, char *ret)
 {
     int rv;
+    struct remote_thread_call *thiscall;
+
+    DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch);
+    thiscall = prepareCall(conn, priv, flags, proc_nr,
+                           args_filter, args,
+                           ret_filter, ret);
+
+    if (!thiscall) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_NO_MEMORY, NULL);
+        return -1;
+    }
+
+    /* Check to see if another thread is dispatching */
+    if (priv->waitDispatch) {
+        /* Stick ourselves on the end of the wait queue */
+        struct remote_thread_call *tmp = priv->waitDispatch;
+        char ignore = 1;
+        while (tmp && tmp->next)
+            tmp = tmp->next;
+        if (tmp)
+            tmp->next = thiscall;
+        else
+            priv->waitDispatch = thiscall;
+
+        /* Force other thread to wakup from poll */
+        safewrite(priv->wakeupSend, &ignore, sizeof(ignore));
+
+        DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+        /* Go to sleep while other thread is working... */
+        if (virCondWait(&thiscall->cond, &priv->lock) < 0) {
+            if (priv->waitDispatch == thiscall) {
+                priv->waitDispatch = thiscall->next;
+            } else {
+                tmp = priv->waitDispatch;
+                while (tmp && tmp->next &&
+                       tmp->next != thiscall) {
+                    tmp = tmp->next;
+                }
+                if (tmp && tmp->next == thiscall)
+                    tmp->next = thiscall->next;
+            }
+            VIR_FREE(thiscall);
+            return -1;
+        }
+
+        DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+        /* Two reasons we can be woken up
+         *  1. Other thread has got our reply ready for us
+         *  2. Other thread is all done, and its out turn to
+         *     be the dispatcher to finish waiting for
+         *     out reply
+         */
+        if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+            thiscall->mode == REMOTE_MODE_ERROR) {
+            /*
+             * We avoided catching the buck and our reply is ready !
+             * We've already had 'thiscall' removed from the list
+             * so just need to (maybe) handle errors & free it
+             */
+            goto cleanup;
+        }
+
+        /* Grr, someone passed the buck onto us ... */
+
+    } else {
+        /* We're first to catch the buck */
+        priv->waitDispatch = thiscall;
+    }
+
+    DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+    /*
+     * The buck stops here!
+     *
+     * At this point we're about to own the dispatch
+     * process...
+     */
+
     /*
      * Avoid needless wake-ups of the event loop in the
      * case where this call is being made from a different
@@ -5772,207 +6394,146 @@ call (virConnectPtr conn, struct private
     if (priv->watch >= 0)
         virEventUpdateHandle(priv->watch, 0);
 
-    rv = doCall(conn, priv,flags, proc_nr,
-                args_filter, args,
-                ret_filter, ret);
+    rv = processCalls(conn, priv,
+                      flags & REMOTE_CALL_IN_OPEN ? 1 : 0,
+                      thiscall);
 
     if (priv->watch >= 0)
         virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE);
-    return rv;
-}
-
-static int
-really_write_buf (virConnectPtr conn, struct private_data *priv,
-                  int in_open /* if we are in virConnectOpen */,
-                  const char *bytes, int len)
-{
-    const char *p;
-    int err;
-
-    p = bytes;
-    if (priv->uses_tls) {
-        do {
-            err = gnutls_record_send (priv->session, p, len);
-            if (err < 0) {
-                if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN)
-                    continue;
-                error (in_open ? NULL : conn,
-                       VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
-                return -1;
-            }
-            len -= err;
-            p += err;
-        }
-        while (len > 0);
-    } else {
-        do {
-            err = send (priv->sock, p, len, 0);
-            if (err == -1) {
-                if (errno == EINTR || errno == EAGAIN)
-                    continue;
-                error (in_open ? NULL : conn,
-                       VIR_ERR_SYSTEM_ERROR, strerror (errno));
-                return -1;
-            }
-            len -= err;
-            p += err;
-        }
-        while (len > 0);
-    }
-
-    return 0;
-}
-
-static int
-really_write_plain (virConnectPtr conn, struct private_data *priv,
-                    int in_open /* if we are in virConnectOpen */,
-                    char *bytes, int len)
-{
-    return really_write_buf(conn, priv, in_open, bytes, len);
-}
-
-#if HAVE_SASL
-static int
-really_write_sasl (virConnectPtr conn, struct private_data *priv,
-              int in_open /* if we are in virConnectOpen */,
-              char *bytes, int len)
-{
-    const char *output;
-    unsigned int outputlen;
-    int err;
-
-    err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen);
-    if (err != SASL_OK) {
-        return -1;
-    }
-
-    return really_write_buf(conn, priv, in_open, output, outputlen);
-}
-#endif
-
-static int
-really_write (virConnectPtr conn, struct private_data *priv,
-              int in_open /* if we are in virConnectOpen */,
-              char *bytes, int len)
-{
-#if HAVE_SASL
-    if (priv->saslconn)
-        return really_write_sasl(conn, priv, in_open, bytes, len);
-    else
-#endif
-        return really_write_plain(conn, priv, in_open, bytes, len);
-}
-
-static int
-really_read_buf (virConnectPtr conn, struct private_data *priv,
-                 int in_open /* if we are in virConnectOpen */,
-                 char *bytes, int len)
-{
-    int err;
-
-    if (priv->uses_tls) {
-    tlsreread:
-        err = gnutls_record_recv (priv->session, bytes, len);
-        if (err < 0) {
-            if (err == GNUTLS_E_INTERRUPTED)
-                goto tlsreread;
-            error (in_open ? NULL : conn,
-                   VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
-            return -1;
-        }
-        if (err == 0) {
-            error (in_open ? NULL : conn,
-                   VIR_ERR_RPC, _("socket closed unexpectedly"));
-            return -1;
-        }
-    } else {
-    reread:
-        err = recv (priv->sock, bytes, len, 0);
-        if (err == -1) {
-            if (errno == EINTR)
-                goto reread;
-            error (in_open ? NULL : conn,
-                   VIR_ERR_SYSTEM_ERROR, strerror (errno));
-            return -1;
-        }
-        if (err == 0) {
-            error (in_open ? NULL : conn,
-                   VIR_ERR_RPC, _("socket closed unexpectedly"));
-            return -1;
-        }
-    }
-
-    return err;
-}
-
-static int
-really_read_plain (virConnectPtr conn, struct private_data *priv,
-                   int in_open /* if we are in virConnectOpen */,
-                   char *bytes, int len)
-{
-    do {
-        int ret = really_read_buf (conn, priv, in_open, bytes, len);
-        if (ret < 0)
-            return -1;
-
-        len -= ret;
-        bytes += ret;
-    } while (len > 0);
-
-    return 0;
-}
-
-#if HAVE_SASL
-static int
-really_read_sasl (virConnectPtr conn, struct private_data *priv,
-                  int in_open /* if we are in virConnectOpen */,
-                  char *bytes, int len)
-{
-    do {
-        int want, got;
-        if (priv->saslDecoded == NULL) {
-            char encoded[8192];
-            int encodedLen = sizeof(encoded);
-            int err, ret;
-            ret = really_read_buf (conn, priv, in_open, encoded, encodedLen);
-            if (ret < 0)
-                return -1;
-
-            err = sasl_decode(priv->saslconn, encoded, ret,
-                              &priv->saslDecoded, &priv->saslDecodedLength);
-        }
-
-        got = priv->saslDecodedLength - priv->saslDecodedOffset;
-        want = len;
-        if (want > got)
-            want = got;
-
-        memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want);
-        priv->saslDecodedOffset += want;
-        if (priv->saslDecodedOffset == priv->saslDecodedLength) {
-            priv->saslDecoded = NULL;
-            priv->saslDecodedOffset = priv->saslDecodedLength = 0;
-        }
-        bytes += want;
-        len -= want;
-    } while (len > 0);
-
-    return 0;
-}
-#endif
-
-static int
-really_read (virConnectPtr conn, struct private_data *priv,
-             int in_open /* if we are in virConnectOpen */,
-             char *bytes, int len)
-{
-#if HAVE_SASL
-    if (priv->saslconn)
-        return really_read_sasl (conn, priv, in_open, bytes, len);
-    else
-#endif
-        return really_read_plain (conn, priv, in_open, bytes, len);
-}
+
+    if (rv < 0) {
+        VIR_FREE(thiscall);
+        return -1;
+    }
+
+cleanup:
+    DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+    if (thiscall->mode == REMOTE_MODE_ERROR) {
+        /* See if caller asked us to keep quiet about missing RPCs
+         * eg for interop with older servers */
+        if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
+            thiscall->err.domain == VIR_FROM_REMOTE &&
+            thiscall->err.code == VIR_ERR_RPC &&
+            thiscall->err.level == VIR_ERR_ERROR &&
+            STRPREFIX(*thiscall->err.message, "unknown procedure")) {
+            rv = -2;
+        } else {
+            server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+                          &thiscall->err);
+            rv = -1;
+        }
+    } else {
+        rv = 0;
+    }
+    VIR_FREE(thiscall);
+    return rv;
+}
+
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static virDomainEventPtr
+remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
+{
+    remote_domain_event_ret ret;
+    virDomainPtr dom;
+    virDomainEventPtr event = NULL;
+    memset (&ret, 0, sizeof ret);
+
+    /* unmarshall parameters, and process it*/
+    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+        error (conn, VIR_ERR_RPC,
+               _("remoteDomainProcessEvent: unmarshalling ret"));
+        return NULL;
+    }
+
+    dom = get_nonnull_domain(conn,ret.dom);
+    if (!dom)
+        return NULL;
+
+    event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
+
+    virDomainFree(dom);
+    return event;
+}
+
+static void
+remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+    struct private_data *priv = conn->privateData;
+    virDomainEventPtr event;
+
+    event = remoteDomainReadEvent(conn, xdr);
+    if (!event)
+        return;
+
+    if (virDomainEventQueuePush(priv->domainEvents,
+                                event) < 0) {
+        DEBUG0("Error adding event to queue");
+        virDomainEventFree(event);
+    }
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void
+remoteDomainEventFired(int watch,
+                       int fd,
+                       int event,
+                       void *opaque)
+{
+    virConnectPtr        conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    remoteDriverLock(priv);
+
+    /* This should be impossible, but it doesn't hurt to check */
+    if (priv->waitDispatch)
+        goto done;
+
+    DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
+
+    if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+         DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+               "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+         virEventRemoveHandle(watch);
+         priv->watch = -1;
+         goto done;
+    }
+
+    if (fd != priv->sock) {
+        virEventRemoveHandle(watch);
+        priv->watch = -1;
+        goto done;
+    }
+
+    if (processCallRecv(conn, priv, 0) < 0)
+        DEBUG0("Something went wrong during async message processing");
+
+done:
+    remoteDriverUnlock(priv);
+}
+
+void
+remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+    virConnectPtr conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    remoteDriverLock(priv);
+
+    virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
+                                virDomainEventDispatchDefaultFunc, NULL);
+    virEventUpdateTimeout(priv->eventFlushTimer, -1);
+
+    remoteDriverUnlock(priv);
+}
+
 
 /* For errors internal to this library. */
 static void
@@ -6272,161 +6833,3 @@ remoteRegister (void)
     return 0;
 }
 
-/**
- * remoteDomainReadEvent
- *
- * Read the event data off the wire
- */
-static virDomainEventPtr
-remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
-{
-    remote_domain_event_ret ret;
-    virDomainPtr dom;
-    virDomainEventPtr event = NULL;
-    memset (&ret, 0, sizeof ret);
-
-    /* unmarshall parameters, and process it*/
-    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
-        error (conn, VIR_ERR_RPC,
-               _("remoteDomainProcessEvent: unmarshalling ret"));
-        return NULL;
-    }
-
-    dom = get_nonnull_domain(conn,ret.dom);
-    if (!dom)
-        return NULL;
-
-    event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
-
-    virDomainFree(dom);
-    return event;
-}
-
-static void
-remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
-{
-    struct private_data *priv = conn->privateData;
-    virDomainEventPtr event;
-
-    event = remoteDomainReadEvent(conn, xdr);
-    if (!event)
-        return;
-
-    DEBUG0("Calling domain event callbacks (no queue)");
-    virDomainEventDispatch(event, priv->callbackList,
-                           virDomainEventDispatchDefaultFunc, NULL);
-    virDomainEventFree(event);
-}
-
-static void
-remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
-{
-    struct private_data *priv = conn->privateData;
-    virDomainEventPtr event;
-
-    event = remoteDomainReadEvent(conn, xdr);
-    if (!event)
-        return;
-
-    if (virDomainEventQueuePush(priv->domainEvents,
-                                event) < 0) {
-        DEBUG0("Error adding event to queue");
-        virDomainEventFree(event);
-    }
-}
-
-/** remoteDomainEventFired:
- *
- * The callback for monitoring the remote socket
- * for event data
- */
-void
-remoteDomainEventFired(int watch,
-                       int fd,
-                       int event,
-                       void *opaque)
-{
-    char buffer[REMOTE_MESSAGE_MAX];
-    char buffer2[4];
-    struct remote_message_header hdr;
-    XDR xdr;
-    int len;
-
-    virConnectPtr        conn = opaque;
-    struct private_data *priv = conn->privateData;
-
-    remoteDriverLock(priv);
-
-    DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
-
-    if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
-         DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
-               "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
-         virEventRemoveHandle(watch);
-         goto done;
-    }
-
-    if (fd != priv->sock) {
-        virEventRemoveHandle(watch);
-        goto done;
-    }
-
-    /* Read and deserialise length word. */
-    if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
-        goto done;
-
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        goto done;
-    }
-    xdr_destroy (&xdr);
-
-    /* Length includes length word - adjust to real length to read. */
-    len -= 4;
-
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
-        error (conn, VIR_ERR_RPC, _("packet received from server too large"));
-        goto done;
-    }
-
-    /* Read reply header and what follows (either a ret or an error). */
-    if (really_read (conn, priv, 0, buffer, len) == -1) {
-        error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
-        goto done;
-    }
-
-    /* Deserialise reply header. */
-    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
-        goto done;
-    }
-
-    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
-        hdr.direction == REMOTE_MESSAGE) {
-        DEBUG0("Encountered an async event");
-        remoteDomainProcessEvent(conn, &xdr);
-    } else {
-        DEBUG0("invalid proc in event firing");
-        error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
-    }
-
-done:
-    remoteDriverUnlock(priv);
-}
-
-void
-remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
-{
-    virConnectPtr conn = opaque;
-    struct private_data *priv = conn->privateData;
-
-    remoteDriverLock(priv);
-
-    virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
-                                virDomainEventDispatchDefaultFunc, NULL);
-    virEventUpdateTimeout(priv->eventFlushTimer, -1);
-
-    remoteDriverUnlock(priv);
-}
diff --git a/src/util.c b/src/util.c
--- a/src/util.c
+++ b/src/util.c
@@ -34,6 +34,7 @@
 #include <poll.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/ioctl.h>
 #if HAVE_SYS_WAIT_H
 #include <sys/wait.h>
 #endif
@@ -155,8 +156,28 @@ virArgvToString(const char *const *argv)
     return ret;
 }
 
+int virSetNonBlock(int fd) {
+#ifndef WIN32
+    int flags;
+    if ((flags = fcntl(fd, F_GETFL)) < 0)
+        return -1;
+    flags |= O_NONBLOCK;
+    if ((fcntl(fd, F_SETFL, flags)) < 0)
+        return -1;
+#else
+    unsigned long flag = 1;
 
-#ifndef __MINGW32__
+    /* This is actually Gnulib's replacement rpl_ioctl function.
+     * We can't call ioctlsocket directly in any case.
+     */
+    if (ioctl (fd, FIONBIO, (void *) &flag) == -1)
+        return -1;
+#endif
+    return 0;
+}
+
+
+#ifndef WIN32
 
 static int virSetCloseExec(int fd) {
     int flags;
@@ -164,16 +185,6 @@ static int virSetCloseExec(int fd) {
         return -1;
     flags |= FD_CLOEXEC;
     if ((fcntl(fd, F_SETFD, flags)) < 0)
-        return -1;
-    return 0;
-}
-
-static int virSetNonBlock(int fd) {
-    int flags;
-    if ((flags = fcntl(fd, F_GETFL)) < 0)
-        return -1;
-    flags |= O_NONBLOCK;
-    if ((fcntl(fd, F_SETFL, flags)) < 0)
         return -1;
     return 0;
 }
diff --git a/src/util.h b/src/util.h
--- a/src/util.h
+++ b/src/util.h
@@ -37,6 +37,8 @@ enum {
     VIR_EXEC_NONBLOCK = (1 << 0),
     VIR_EXEC_DAEMON = (1 << 1),
 };
+
+int virSetNonBlock(int fd);
 
 int virExec(virConnectPtr conn,
             const char *const*argv,

-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]