[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [libvirt] PATCH: 8/25: Concurrent dispatch of RPC methods



On Fri, Jan 16, 2009 at 12:11:16PM +0000, Daniel P. Berrange wrote:
> > > @@ -114,6 +164,11 @@ struct private_data {
> > >      virDomainEventQueuePtr domainEvents;
> > >      /* Timer for flushing domainEvents queue */
> > >      int eventFlushTimer;
> > > +
> > > +    /* List of threads currently doing dispatch */
> > > +    int wakeupSend;
> > > +    int wakeupRead;
> > 
> > How about appending "FD" to indicate these are file descriptors.
> > The names combined with the comment (which must apply to waitDispatch)
> > made me wonder what they represented.  Only when I saw them used
> > in safewrite /saferead calls did I get it.
> 
> Yes, good idea - and its not really a list of threads either,
> so the comment is a little misleading :-)

> > > +    /* Encode the length word. */
> > > +    xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
> > > +    if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
> > > +        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
> > > +               _("xdr_int (length word)"));
> > 
> > I haven't done enough xdr* work to know, and man pages
> > didn't provide an immediate answer:
> > Is there no need to call xdr_destroy on this error path?
> > I'd expect xdrmem_create to do any allocation, not xdr_int.
> > There are many like this.
> 
> Yes, the 'error:' codepath should be calling 'xdr_destroy(&xdr)'
> to ensure free'ing of memory.
> 
> > 
> > > +        goto error;
> > > +    }
> > > +    xdr_destroy (&xdr);
> > > +
> > > +    return rv;
> > > +
> > > +error:
> > > +    VIR_FREE(ret);
> > > +    return NULL;
> > 
> > The above should free rv, not ret:
> > 
> >        VIR_FREE(rv);


Here is an update with those suggested renames & bug fixes in it.

It also addresses the error reporting issue mentioned in

http://www.redhat.com/archives/libvir-list/2009-January/msg00428.html

That code should not have been using DEBUG() - it now correctly
raises a real error including the error string, not just errno.
There were two other bugs with missing error raising in the
path for sasl_encode/decode.

Everything upto this patch is committed, so this is diffed
against current CVS.

 libvirt_private.syms |    1 
 remote_internal.c    | 1539 ++++++++++++++++++++++++++++++++-------------------
 util.c               |   33 -
 util.h               |    2 
 4 files changed, 1002 insertions(+), 573 deletions(-)

Daniel

diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -290,6 +290,7 @@ virEnumToString;
 virEventAddHandle;
 virEventRemoveHandle;
 virExec;
+virSetNonBlock;
 virFormatMacAddr;
 virGetHostname;
 virParseMacAddr;
diff --git a/src/remote_internal.c b/src/remote_internal.c
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -68,6 +68,8 @@
 
 #include <netdb.h>
 
+#include <poll.h>
+
 /* AI_ADDRCONFIG is missing on some systems. */
 #ifndef AI_ADDRCONFIG
 # define AI_ADDRCONFIG 0
@@ -86,8 +88,44 @@
 #include "util.h"
 #include "event.h"
 
+#ifdef WIN32
+#define pipe(fds) _pipe(fds,4096, _O_BINARY)
+#endif
+
+
 static int inside_daemon = 0;
 
+struct remote_thread_call;
+
+
+enum {
+    REMOTE_MODE_WAIT_TX,
+    REMOTE_MODE_WAIT_RX,
+    REMOTE_MODE_COMPLETE,
+    REMOTE_MODE_ERROR,
+};
+
+struct remote_thread_call {
+    int mode;
+
+    /* 4 byte length, followed by RPC message header+body */
+    char buffer[4 + REMOTE_MESSAGE_MAX];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
+    unsigned int serial;
+    unsigned int proc_nr;
+
+    virCond cond;
+
+    xdrproc_t ret_filter;
+    char *ret;
+
+    remote_error err;
+
+    struct remote_thread_call *next;
+};
+
 struct private_data {
     virMutex lock;
 
@@ -101,12 +139,24 @@ struct private_data {
     int localUses;              /* Ref count for private data */
     char *hostname;             /* Original hostname */
     FILE *debugLog;             /* Debug remote protocol */
+
 #if HAVE_SASL
     sasl_conn_t *saslconn;      /* SASL context */
+
     const char *saslDecoded;
     unsigned int saslDecodedLength;
     unsigned int saslDecodedOffset;
-#endif
+
+    const char *saslEncoded;
+    unsigned int saslEncodedLength;
+    unsigned int saslEncodedOffset;
+#endif
+
+    /* 4 byte length, followed by RPC message header+body */
+    char buffer[4 + REMOTE_MESSAGE_MAX];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
     /* The list of domain event callbacks */
     virDomainEventCallbackListPtr callbackList;
     /* The queue of domain events generated
@@ -114,6 +164,13 @@ struct private_data {
     virDomainEventQueuePtr domainEvents;
     /* Timer for flushing domainEvents queue */
     int eventFlushTimer;
+
+    /* Self-pipe to wakeup threads waiting in poll() */
+    int wakeupSendFD;
+    int wakeupReadFD;
+
+    /* List of threads currently waiting for dispatch */
+    struct remote_thread_call *waitDispatch;
 };
 
 enum {
@@ -160,7 +217,6 @@ static void make_nonnull_network (remote
 static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
 static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
 void remoteDomainEventFired(int watch, int fd, int event, void *data);
-static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
 static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
 void remoteDomainEventQueueFlush(int timer, void *opaque);
 /*----------------------------------------------------------------------*/
@@ -274,6 +330,7 @@ doRemoteOpen (virConnectPtr conn,
               virConnectAuthPtr auth ATTRIBUTE_UNUSED,
               int flags)
 {
+    int wakeupFD[2];
     char *transport_str = NULL;
 
     if (conn->uri) {
@@ -696,6 +753,21 @@ doRemoteOpen (virConnectPtr conn,
 
     } /* switch (transport) */
 
+    if (virSetNonBlock(priv->sock) < 0) {
+        errorf (conn, VIR_ERR_SYSTEM_ERROR,
+                _("unable to make socket non-blocking %s"),
+                strerror(errno));
+        goto failed;
+    }
+
+    if (pipe(wakeupFD) < 0) {
+        errorf (conn, VIR_ERR_SYSTEM_ERROR,
+                _("unable to make pipe %s"),
+                strerror(errno));
+        goto failed;
+    }
+    priv->wakeupReadFD = wakeupFD[0];
+    priv->wakeupSendFD = wakeupFD[1];
 
     /* Try and authenticate with server */
     if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
@@ -768,6 +840,7 @@ doRemoteOpen (virConnectPtr conn,
             DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
                     "continuing without events.");
             virEventRemoveHandle(priv->watch);
+            priv->watch = -1;
         }
     }
     /* Successful. */
@@ -848,6 +921,7 @@ remoteOpen (virConnectPtr conn,
     }
     remoteDriverLock(priv);
     priv->localUses = 1;
+    priv->watch = -1;
 
     if (flags & VIR_CONNECT_RO)
         rflags |= VIR_DRV_OPEN_REMOTE_RO;
@@ -1220,6 +1294,7 @@ doRemoteClose (virConnectPtr conn, struc
         virEventRemoveTimeout(priv->eventFlushTimer);
         /* Remove handle for remote events */
         virEventRemoveHandle(priv->watch);
+        priv->watch = -1;
     }
 
     /* Close socket. */
@@ -5537,12 +5612,665 @@ done:
 
 /*----------------------------------------------------------------------*/
 
-static int really_write (virConnectPtr conn, struct private_data *priv,
-                         int in_open, char *bytes, int len);
-static int really_read (virConnectPtr conn, struct private_data *priv,
-                        int in_open, char *bytes, int len);
-
-/* This function performs a remote procedure call to procedure PROC_NR.
+
+static struct remote_thread_call *
+prepareCall(virConnectPtr conn,
+            struct private_data *priv,
+            int flags,
+            int proc_nr,
+            xdrproc_t args_filter, char *args,
+            xdrproc_t ret_filter, char *ret)
+{
+    XDR xdr;
+    struct remote_message_header hdr;
+    struct remote_thread_call *rv;
+
+    if (VIR_ALLOC(rv) < 0)
+        return NULL;
+
+    if (virCondInit(&rv->cond) < 0) {
+        VIR_FREE(rv);
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_INTERNAL_ERROR,
+               _("cannot initialize mutex"));
+        return NULL;
+    }
+
+    /* Get a unique serial number for this message. */
+    rv->serial = priv->counter++;
+    rv->proc_nr = proc_nr;
+    rv->ret_filter = ret_filter;
+    rv->ret = ret;
+
+    hdr.prog = REMOTE_PROGRAM;
+    hdr.vers = REMOTE_PROTOCOL_VERSION;
+    hdr.proc = proc_nr;
+    hdr.direction = REMOTE_CALL;
+    hdr.serial = rv->serial;
+    hdr.status = REMOTE_OK;
+
+    /* Serialise header followed by args. */
+    xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
+        goto error;
+    }
+
+    if (!(*args_filter) (&xdr, args)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+               _("marshalling args"));
+        goto error;
+    }
+
+    /* Get the length stored in buffer. */
+    rv->bufferLength = xdr_getpos (&xdr);
+    xdr_destroy (&xdr);
+
+    /* Length must include the length word itself (always encoded in
+     * 4 bytes as per RFC 4506).
+     */
+    rv->bufferLength += 4;
+
+    /* Encode the length word. */
+    xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
+    if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+               _("xdr_int (length word)"));
+        goto error;
+    }
+    xdr_destroy (&xdr);
+
+    return rv;
+
+error:
+    xdr_destroy (&xdr);
+    VIR_FREE(rv);
+    return NULL;
+}
+
+
+
+static int
+processCallWrite(virConnectPtr conn,
+                 struct private_data *priv,
+                 int in_open /* if we are in virConnectOpen */,
+                 const char *bytes, int len)
+{
+    int ret;
+
+    if (priv->uses_tls) {
+    tls_resend:
+        ret = gnutls_record_send (priv->session, bytes, len);
+        if (ret < 0) {
+            if (ret == GNUTLS_E_INTERRUPTED)
+                goto tls_resend;
+            if (ret == GNUTLS_E_AGAIN)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret));
+            return -1;
+        }
+    } else {
+    resend:
+        ret = send (priv->sock, bytes, len, 0);
+        if (ret == -1) {
+            if (errno == EINTR)
+                goto resend;
+            if (errno == EWOULDBLOCK)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_SYSTEM_ERROR, strerror (errno));
+            return -1;
+
+        }
+    }
+
+    return ret;
+}
+
+
+static int
+processCallRead(virConnectPtr conn,
+                struct private_data *priv,
+                int in_open /* if we are in virConnectOpen */,
+                char *bytes, int len)
+{
+    int ret;
+
+    if (priv->uses_tls) {
+    tls_resend:
+        ret = gnutls_record_recv (priv->session, bytes, len);
+        if (ret == GNUTLS_E_INTERRUPTED)
+            goto tls_resend;
+        if (ret == GNUTLS_E_AGAIN)
+            return 0;
+
+        /* Treat 0 == EOF as an error */
+        if (ret <= 0) {
+            if (ret < 0)
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_GNUTLS_ERROR,
+                        _("failed to read from TLS socket %s"),
+                        gnutls_strerror (ret));
+            else
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_SYSTEM_ERROR,
+                        "%s", _("server closed connection"));
+            return -1;
+        }
+    } else {
+    resend:
+        ret = recv (priv->sock, bytes, len, 0);
+        if (ret <= 0) {
+            if (ret == -1) {
+                if (errno == EINTR)
+                    goto resend;
+                if (errno == EWOULDBLOCK)
+                    return 0;
+
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_SYSTEM_ERROR,
+                        _("failed to read from socket %s"),
+                        strerror (errno));
+            } else {
+                errorf (in_open ? NULL : conn,
+                        VIR_ERR_SYSTEM_ERROR,
+                        "%s", _("server closed connection"));
+            }
+            return -1;
+        }
+    }
+
+    return ret;
+}
+
+
+static int
+processCallSendOne(virConnectPtr conn,
+                   struct private_data *priv,
+                   int in_open,
+                   struct remote_thread_call *thecall)
+{
+#if HAVE_SASL
+    if (priv->saslconn) {
+        const char *output;
+        unsigned int outputlen;
+        int err, ret;
+
+        if (!priv->saslEncoded) {
+            err = sasl_encode(priv->saslconn,
+                              thecall->buffer + thecall->bufferOffset,
+                              thecall->bufferLength - thecall->bufferOffset,
+                              &output, &outputlen);
+            if (err != SASL_OK) {
+                errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+                        _("failed to encode SASL data: %s"),
+                        sasl_errstring(err, NULL, NULL));
+                return -1;
+            }
+            priv->saslEncoded = output;
+            priv->saslEncodedLength = outputlen;
+            priv->saslEncodedOffset = 0;
+
+            thecall->bufferOffset = thecall->bufferLength;
+        }
+
+        ret = processCallWrite(conn, priv, in_open,
+                               priv->saslEncoded + priv->saslEncodedOffset,
+                               priv->saslEncodedLength - priv->saslEncodedOffset);
+        if (ret < 0)
+            return ret;
+        priv->saslEncodedOffset += ret;
+
+        if (priv->saslEncodedOffset == priv->saslEncodedLength) {
+            priv->saslEncoded = NULL;
+            priv->saslEncodedOffset = priv->saslEncodedLength = 0;
+            thecall->mode = REMOTE_MODE_WAIT_RX;
+        }
+    } else {
+#endif
+        int ret;
+        ret = processCallWrite(conn, priv, in_open,
+                               thecall->buffer + thecall->bufferOffset,
+                               thecall->bufferLength - thecall->bufferOffset);
+        if (ret < 0)
+            return ret;
+        thecall->bufferOffset += ret;
+
+        if (thecall->bufferOffset == thecall->bufferLength) {
+            thecall->bufferOffset = thecall->bufferLength = 0;
+            thecall->mode = REMOTE_MODE_WAIT_RX;
+        }
+#if HAVE_SASL
+    }
+#endif
+    return 0;
+}
+
+
+static int
+processCallSend(virConnectPtr conn, struct private_data *priv,
+                int in_open) {
+    struct remote_thread_call *thecall = priv->waitDispatch;
+
+    while (thecall &&
+           thecall->mode != REMOTE_MODE_WAIT_TX)
+        thecall = thecall->next;
+
+    if (!thecall)
+        return -1; /* Shouldn't happen, but you never know... */
+
+    while (thecall) {
+        int ret = processCallSendOne(conn, priv, in_open, thecall);
+        if (ret < 0)
+            return ret;
+
+        if (thecall->mode == REMOTE_MODE_WAIT_TX)
+            return 0; /* Blocking write, to back to event loop */
+
+        thecall = thecall->next;
+    }
+
+    return 0; /* No more calls to send, all done */
+}
+
+static int
+processCallRecvSome(virConnectPtr conn, struct private_data *priv,
+                    int in_open) {
+    unsigned int wantData;
+
+    /* Start by reading length word */
+    if (priv->bufferLength == 0)
+        priv->bufferLength = 4;
+
+    wantData = priv->bufferLength - priv->bufferOffset;
+
+#if HAVE_SASL
+    if (priv->saslconn) {
+        if (priv->saslDecoded == NULL) {
+            char encoded[8192];
+            unsigned int encodedLen = sizeof(encoded);
+            int ret, err;
+            ret = processCallRead(conn, priv, in_open,
+                                  encoded, encodedLen);
+            if (ret < 0)
+                return -1;
+            if (ret == 0)
+                return 0;
+
+            err = sasl_decode(priv->saslconn, encoded, ret,
+                              &priv->saslDecoded, &priv->saslDecodedLength);
+            if (err != SASL_OK) {
+                errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+                        _("failed to decode SASL data: %s"),
+                        sasl_errstring(err, NULL, NULL));
+                return -1;
+            }
+            priv->saslDecodedOffset = 0;
+        }
+
+        if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData)
+            wantData = (priv->saslDecodedLength - priv->saslDecodedOffset);
+
+        memcpy(priv->buffer + priv->bufferOffset,
+               priv->saslDecoded + priv->saslDecodedOffset,
+               wantData);
+        priv->saslDecodedOffset += wantData;
+        priv->bufferOffset += wantData;
+        if (priv->saslDecodedOffset == priv->saslDecodedLength) {
+            priv->saslDecodedLength = priv->saslDecodedLength = 0;
+            priv->saslDecoded = NULL;
+        }
+
+        return wantData;
+    } else {
+#endif
+        int ret;
+
+        ret = processCallRead(conn, priv, in_open,
+                              priv->buffer + priv->bufferOffset,
+                              wantData);
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;
+
+        priv->bufferOffset += ret;
+
+        return ret;
+#if HAVE_SASL
+    }
+#endif
+}
+
+
+static void
+processCallAsyncEvent(virConnectPtr conn, struct private_data *priv,
+                      int in_open,
+                      remote_message_header *hdr,
+                      XDR *xdr) {
+    /* An async message has come in while we were waiting for the
+     * response. Process it to pull it off the wire, and try again
+     */
+    DEBUG0("Encountered an event while waiting for a response");
+
+    if (in_open) {
+        DEBUG("Ignoring bogus event %d received while in open", hdr->proc);
+        return;
+    }
+
+    if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) {
+        remoteDomainQueueEvent(conn, xdr);
+        virEventUpdateTimeout(priv->eventFlushTimer, 0);
+    } else {
+        DEBUG("Unexpected event proc %d", hdr->proc);
+    }
+}
+
+static int
+processCallRecvLen(virConnectPtr conn, struct private_data *priv,
+                   int in_open) {
+    XDR xdr;
+    int len;
+
+    xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
+    if (!xdr_int (&xdr, &len)) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+        return -1;
+    }
+    xdr_destroy (&xdr);
+
+    /* Length includes length word - adjust to real length to read. */
+    len -= 4;
+
+    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("packet received from server too large"));
+        return -1;
+    }
+
+    /* Extend our declared buffer length and carry
+       on reading the header + payload */
+    priv->bufferLength += len;
+    DEBUG("Got length, now need %d total (%d more)", priv->bufferLength, len);
+    return 0;
+}
+
+
+static int
+processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
+                   int in_open) {
+    XDR xdr;
+    struct remote_message_header hdr;
+    int len = priv->bufferLength - 4;
+    struct remote_thread_call *thecall;
+
+    /* Deserialise reply header. */
+    xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("invalid header in reply"));
+        return -1;
+    }
+
+    /* Check program, version, etc. are what we expect. */
+    if (hdr.prog != REMOTE_PROGRAM) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown program (received %x, expected %x)"),
+                       hdr.prog, REMOTE_PROGRAM);
+        return -1;
+    }
+    if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown protocol version (received %x, expected %x)"),
+                       hdr.vers, REMOTE_PROTOCOL_VERSION);
+        return -1;
+    }
+
+    /* Async events from server need special handling */
+    if (hdr.direction == REMOTE_MESSAGE) {
+        processCallAsyncEvent(conn, priv, in_open,
+                              &hdr, &xdr);
+        xdr_destroy(&xdr);
+        return 0;
+    }
+
+    if (hdr.direction != REMOTE_REPLY) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("got unexpected RPC call %d from server"),
+                       hdr.proc);
+        xdr_destroy(&xdr);
+        return -1;
+    }
+
+    /* Ok, definitely got an RPC reply now find
+       out who's been waiting for it */
+
+    thecall = priv->waitDispatch;
+    while (thecall &&
+           thecall->serial != hdr.serial)
+        thecall = thecall->next;
+
+    if (!thecall) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("no call waiting for reply with serial %d"),
+                       hdr.serial);
+        xdr_destroy(&xdr);
+        return -1;
+    }
+
+    if (hdr.proc != thecall->proc_nr) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown procedure (received %x, expected %x)"),
+                       hdr.proc, thecall->proc_nr);
+        xdr_destroy (&xdr);
+        return -1;
+    }
+
+    /* Status is either REMOTE_OK (meaning that what follows is a ret
+     * structure), or REMOTE_ERROR (and what follows is a remote_error
+     * structure).
+     */
+    switch (hdr.status) {
+    case REMOTE_OK:
+        if (!(*thecall->ret_filter) (&xdr, thecall->ret)) {
+            error (in_open ? NULL : conn, VIR_ERR_RPC,
+                   _("unmarshalling ret"));
+            return -1;
+        }
+        thecall->mode = REMOTE_MODE_COMPLETE;
+        xdr_destroy (&xdr);
+        return 0;
+
+    case REMOTE_ERROR:
+        memset (&thecall->err, 0, sizeof thecall->err);
+        if (!xdr_remote_error (&xdr, &thecall->err)) {
+            error (in_open ? NULL : conn,
+                   VIR_ERR_RPC, _("unmarshalling remote_error"));
+            return -1;
+        }
+        xdr_destroy (&xdr);
+        thecall->mode = REMOTE_MODE_ERROR;
+        return 0;
+
+    default:
+        virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown status (received %x)"),
+                       hdr.status);
+        xdr_destroy (&xdr);
+        return -1;
+    }
+}
+
+
+static int
+processCallRecv(virConnectPtr conn, struct private_data *priv,
+                int in_open) {
+    int ret;
+
+    /* Read as much data as is available, until we get
+     * EGAIN
+     */
+    for (;;) {
+        ret = processCallRecvSome(conn, priv, in_open);
+
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;  /* Blocking on read */
+
+        /* Check for completion of our goal */
+        if (priv->bufferOffset == priv->bufferLength) {
+            if (priv->bufferOffset == 4) {
+                ret = processCallRecvLen(conn, priv, in_open);
+            } else {
+                ret = processCallRecvMsg(conn, priv, in_open);
+                priv->bufferOffset = priv->bufferLength = 0;
+            }
+            if (ret < 0)
+                return -1;
+        }
+    }
+}
+
+/*
+ * Process all calls pending dispatch/receive until we
+ * get a reply to our own call. Then quit and pass the buck
+ * to someone else.
+ */
+static int
+processCalls(virConnectPtr conn,
+             struct private_data *priv,
+             int in_open,
+             struct remote_thread_call *thiscall)
+{
+    struct pollfd fds[2];
+    int ret;
+
+    fds[0].fd = priv->sock;
+    fds[1].fd = priv->wakeupReadFD;
+
+    for (;;) {
+        struct remote_thread_call *tmp = priv->waitDispatch;
+        struct remote_thread_call *prev;
+        char ignore;
+
+        fds[0].events = fds[0].revents = 0;
+        fds[1].events = fds[1].revents = 0;
+
+        fds[1].events = POLLIN;
+        while (tmp) {
+            if (tmp->mode == REMOTE_MODE_WAIT_RX)
+                fds[0].events |= POLLIN;
+            if (tmp->mode == REMOTE_MODE_WAIT_TX)
+                fds[0].events |= POLLOUT;
+
+            tmp = tmp->next;
+        }
+
+        /* Release lock while poll'ing so other threads
+         * can stuff themselves on the queue */
+        remoteDriverUnlock(priv);
+
+    repoll:
+        ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+        if (ret < 0 && errno == EINTR)
+            goto repoll;
+        remoteDriverLock(priv);
+
+        if (fds[1].revents) {
+            DEBUG0("Woken up from poll by other thread");
+            saferead(priv->wakeupReadFD, &ignore, sizeof(ignore));
+        }
+
+        if (ret < 0) {
+            if (errno == EWOULDBLOCK)
+                continue;
+            errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+                    _("poll on socket failed %s"), strerror(errno));
+            return -1;
+        }
+
+        if (fds[0].revents & POLLOUT) {
+            if (processCallSend(conn, priv, in_open) < 0)
+                return -1;
+        }
+
+        if (fds[0].revents & POLLIN) {
+            if (processCallRecv(conn, priv, in_open) < 0)
+                return -1;
+        }
+
+        /* Iterate through waiting threads and if
+         * any are complete then tell 'em to wakeup
+         */
+        tmp = priv->waitDispatch;
+        prev = NULL;
+        while (tmp) {
+            if (tmp != thiscall &&
+                (tmp->mode == REMOTE_MODE_COMPLETE ||
+                 tmp->mode == REMOTE_MODE_ERROR)) {
+                /* Take them out of the list */
+                if (prev)
+                    prev->next = tmp->next;
+                else
+                    priv->waitDispatch = tmp->next;
+
+                /* And wake them up....
+                 * ...they won't actually wakeup until
+                 * we release our mutex a short while
+                 * later...
+                 */
+                DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp, priv->waitDispatch);
+                virCondSignal(&tmp->cond);
+            }
+            prev = tmp;
+            tmp = tmp->next;
+        }
+
+        /* Now see if *we* are done */
+        if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+            thiscall->mode == REMOTE_MODE_ERROR) {
+            /* We're at head of the list already, so
+             * remove us
+             */
+            priv->waitDispatch = thiscall->next;
+            DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr, thiscall, priv->waitDispatch);
+            /* See if someone else is still waiting
+             * and if so, then pass the buck ! */
+            if (priv->waitDispatch) {
+                DEBUG("Passing the buck to %d %p", priv->waitDispatch->proc_nr, priv->waitDispatch);
+                virCondSignal(&priv->waitDispatch->cond);
+            }
+            return 0;
+        }
+
+
+        if (fds[0].revents & (POLLHUP | POLLERR)) {
+            errorf(in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+                   "%s", _("received hangup / error event on socket"));
+            return -1;
+        }
+    }
+}
+
+/*
+ * This function performs a remote procedure call to procedure PROC_NR.
  *
  * NB. This does not free the args structure (not desirable, since you
  * often want this allocated on the stack or else it contains strings
@@ -5551,204 +6279,29 @@ static int really_read (virConnectPtr co
  *
  * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
  * else Bad Things will happen in the XDR code.
- */
-static int
-doCall (virConnectPtr conn, struct private_data *priv,
-        int flags /* if we are in virConnectOpen */,
-        int proc_nr,
-        xdrproc_t args_filter, char *args,
-        xdrproc_t ret_filter, char *ret)
-{
-    char buffer[REMOTE_MESSAGE_MAX];
-    char buffer2[4];
-    struct remote_message_header hdr;
-    XDR xdr;
-    int len;
-    struct remote_error rerror;
-
-    /* Get a unique serial number for this message. */
-    int serial = priv->counter++;
-
-    hdr.prog = REMOTE_PROGRAM;
-    hdr.vers = REMOTE_PROTOCOL_VERSION;
-    hdr.proc = proc_nr;
-    hdr.direction = REMOTE_CALL;
-    hdr.serial = serial;
-    hdr.status = REMOTE_OK;
-
-    /* Serialise header followed by args. */
-    xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
-        return -1;
-    }
-
-    if (!(*args_filter) (&xdr, args)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("marshalling args"));
-        return -1;
-    }
-
-    /* Get the length stored in buffer. */
-    len = xdr_getpos (&xdr);
-    xdr_destroy (&xdr);
-
-    /* Length must include the length word itself (always encoded in
-     * 4 bytes as per RFC 4506).
-     */
-    len += 4;
-
-    /* Encode the length word. */
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("xdr_int (length word)"));
-        return -1;
-    }
-    xdr_destroy (&xdr);
-
-    /* Send length word followed by header+args. */
-    if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1 ||
-        really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
-        return -1;
-
-retry_read:
-    /* Read and deserialise length word. */
-    if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
-        return -1;
-
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        return -1;
-    }
-    xdr_destroy (&xdr);
-
-    /* Length includes length word - adjust to real length to read. */
-    len -= 4;
-
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("packet received from server too large"));
-        return -1;
-    }
-
-    /* Read reply header and what follows (either a ret or an error). */
-    if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1)
-        return -1;
-
-    /* Deserialise reply header. */
-    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("invalid header in reply"));
-        return -1;
-    }
-
-    /* Check program, version, etc. are what we expect. */
-    if (hdr.prog != REMOTE_PROGRAM) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown program (received %x, expected %x)"),
-                         hdr.prog, REMOTE_PROGRAM);
-        return -1;
-    }
-    if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown protocol version (received %x, expected %x)"),
-                         hdr.vers, REMOTE_PROTOCOL_VERSION);
-        return -1;
-    }
-
-    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
-        hdr.direction == REMOTE_MESSAGE) {
-        /* An async message has come in while we were waiting for the
-         * response. Process it to pull it off the wire, and try again
-         */
-        DEBUG0("Encountered an event while waiting for a response");
-
-        remoteDomainQueueEvent(conn, &xdr);
-        virEventUpdateTimeout(priv->eventFlushTimer, 0);
-
-        DEBUG0("Retrying read");
-        xdr_destroy (&xdr);
-        goto retry_read;
-    }
-    if (hdr.proc != proc_nr) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown procedure (received %x, expected %x)"),
-                         hdr.proc, proc_nr);
-        return -1;
-    }
-    if (hdr.direction != REMOTE_REPLY) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown direction (received %x, expected %x)"),
-                         hdr.direction, REMOTE_REPLY);
-        return -1;
-    }
-    if (hdr.serial != serial) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown serial (received %x, expected %x)"),
-                         hdr.serial, serial);
-        return -1;
-    }
-
-    /* Status is either REMOTE_OK (meaning that what follows is a ret
-     * structure), or REMOTE_ERROR (and what follows is a remote_error
-     * structure).
-     */
-    switch (hdr.status) {
-    case REMOTE_OK:
-        if (!(*ret_filter) (&xdr, ret)) {
-            error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-                   _("unmarshalling ret"));
-            return -1;
-        }
-        xdr_destroy (&xdr);
-        return 0;
-
-    case REMOTE_ERROR:
-        memset (&rerror, 0, sizeof rerror);
-        if (!xdr_remote_error (&xdr, &rerror)) {
-            error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                   VIR_ERR_RPC, _("unmarshalling remote_error"));
-            return -1;
-        }
-        xdr_destroy (&xdr);
-        /* See if caller asked us to keep quiet about missing RPCs
-         * eg for interop with older servers */
-        if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
-            rerror.domain == VIR_FROM_REMOTE &&
-            rerror.code == VIR_ERR_RPC &&
-            rerror.level == VIR_ERR_ERROR &&
-            STRPREFIX(*rerror.message, "unknown procedure")) {
-            return -2;
-        }
-        server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror);
-        xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror);
-        return -1;
-
-    default:
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown status (received %x)"),
-                         hdr.status);
-        xdr_destroy (&xdr);
-        return -1;
-    }
-}
-
-
+ *
+ * NB(3) You must have the private_data lock before calling this
+ *
+ * NB(4) This is very complicated. Due to connection cloning, multiple
+ * threads can want to use the socket at once. Obviously only one of
+ * them can. So if someone's using the socket, other threads are put
+ * to sleep on condition variables. THe existing thread may completely
+ * send & receive their RPC call/reply while they're asleep. Or it
+ * may only get around to dealing with sending the call. Or it may
+ * get around to neither. So upon waking up from slumber, the other
+ * thread may or may not have more work todo.
+ *
+ * We call this dance  'passing the buck'
+ *
+ *      http://en.wikipedia.org/wiki/Passing_the_buck
+ *
+ *   "Buck passing or passing the buck is the action of transferring
+ *    responsibility or blame unto another person. It is also used as
+ *    a strategy in power politics when the actions of one country/
+ *    nation are blamed on another, providing an opportunity for war."
+ *
+ * NB(5) Don't Panic!
+ */
 static int
 call (virConnectPtr conn, struct private_data *priv,
       int flags /* if we are in virConnectOpen */,
@@ -5757,6 +6310,87 @@ call (virConnectPtr conn, struct private
       xdrproc_t ret_filter, char *ret)
 {
     int rv;
+    struct remote_thread_call *thiscall;
+
+    DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch);
+    thiscall = prepareCall(conn, priv, flags, proc_nr,
+                           args_filter, args,
+                           ret_filter, ret);
+
+    if (!thiscall) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_NO_MEMORY, NULL);
+        return -1;
+    }
+
+    /* Check to see if another thread is dispatching */
+    if (priv->waitDispatch) {
+        /* Stick ourselves on the end of the wait queue */
+        struct remote_thread_call *tmp = priv->waitDispatch;
+        char ignore = 1;
+        while (tmp && tmp->next)
+            tmp = tmp->next;
+        if (tmp)
+            tmp->next = thiscall;
+        else
+            priv->waitDispatch = thiscall;
+
+        /* Force other thread to wakup from poll */
+        safewrite(priv->wakeupSendFD, &ignore, sizeof(ignore));
+
+        DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+        /* Go to sleep while other thread is working... */
+        if (virCondWait(&thiscall->cond, &priv->lock) < 0) {
+            if (priv->waitDispatch == thiscall) {
+                priv->waitDispatch = thiscall->next;
+            } else {
+                tmp = priv->waitDispatch;
+                while (tmp && tmp->next &&
+                       tmp->next != thiscall) {
+                    tmp = tmp->next;
+                }
+                if (tmp && tmp->next == thiscall)
+                    tmp->next = thiscall->next;
+            }
+            errorf(flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+                   VIR_ERR_INTERNAL_ERROR, "%s",
+                   _("failed to wait on condition"));
+            VIR_FREE(thiscall);
+            return -1;
+        }
+
+        DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+        /* Two reasons we can be woken up
+         *  1. Other thread has got our reply ready for us
+         *  2. Other thread is all done, and it is our turn to
+         *     be the dispatcher to finish waiting for
+         *     our reply
+         */
+        if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+            thiscall->mode == REMOTE_MODE_ERROR) {
+            /*
+             * We avoided catching the buck and our reply is ready !
+             * We've already had 'thiscall' removed from the list
+             * so just need to (maybe) handle errors & free it
+             */
+            goto cleanup;
+        }
+
+        /* Grr, someone passed the buck onto us ... */
+
+    } else {
+        /* We're first to catch the buck */
+        priv->waitDispatch = thiscall;
+    }
+
+    DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+    /*
+     * The buck stops here!
+     *
+     * At this point we're about to own the dispatch
+     * process...
+     */
+
     /*
      * Avoid needless wake-ups of the event loop in the
      * case where this call is being made from a different
@@ -5767,207 +6401,146 @@ call (virConnectPtr conn, struct private
     if (priv->watch >= 0)
         virEventUpdateHandle(priv->watch, 0);
 
-    rv = doCall(conn, priv,flags, proc_nr,
-                args_filter, args,
-                ret_filter, ret);
+    rv = processCalls(conn, priv,
+                      flags & REMOTE_CALL_IN_OPEN ? 1 : 0,
+                      thiscall);
 
     if (priv->watch >= 0)
         virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE);
-    return rv;
-}
-
-static int
-really_write_buf (virConnectPtr conn, struct private_data *priv,
-                  int in_open /* if we are in virConnectOpen */,
-                  const char *bytes, int len)
-{
-    const char *p;
-    int err;
-
-    p = bytes;
-    if (priv->uses_tls) {
-        do {
-            err = gnutls_record_send (priv->session, p, len);
-            if (err < 0) {
-                if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN)
-                    continue;
-                error (in_open ? NULL : conn,
-                       VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
-                return -1;
-            }
-            len -= err;
-            p += err;
-        }
-        while (len > 0);
-    } else {
-        do {
-            err = send (priv->sock, p, len, 0);
-            if (err == -1) {
-                if (errno == EINTR || errno == EAGAIN)
-                    continue;
-                error (in_open ? NULL : conn,
-                       VIR_ERR_SYSTEM_ERROR, strerror (errno));
-                return -1;
-            }
-            len -= err;
-            p += err;
-        }
-        while (len > 0);
-    }
-
-    return 0;
-}
-
-static int
-really_write_plain (virConnectPtr conn, struct private_data *priv,
-                    int in_open /* if we are in virConnectOpen */,
-                    char *bytes, int len)
-{
-    return really_write_buf(conn, priv, in_open, bytes, len);
-}
-
-#if HAVE_SASL
-static int
-really_write_sasl (virConnectPtr conn, struct private_data *priv,
-              int in_open /* if we are in virConnectOpen */,
-              char *bytes, int len)
-{
-    const char *output;
-    unsigned int outputlen;
-    int err;
-
-    err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen);
-    if (err != SASL_OK) {
-        return -1;
-    }
-
-    return really_write_buf(conn, priv, in_open, output, outputlen);
-}
-#endif
-
-static int
-really_write (virConnectPtr conn, struct private_data *priv,
-              int in_open /* if we are in virConnectOpen */,
-              char *bytes, int len)
-{
-#if HAVE_SASL
-    if (priv->saslconn)
-        return really_write_sasl(conn, priv, in_open, bytes, len);
-    else
-#endif
-        return really_write_plain(conn, priv, in_open, bytes, len);
-}
-
-static int
-really_read_buf (virConnectPtr conn, struct private_data *priv,
-                 int in_open /* if we are in virConnectOpen */,
-                 char *bytes, int len)
-{
-    int err;
-
-    if (priv->uses_tls) {
-    tlsreread:
-        err = gnutls_record_recv (priv->session, bytes, len);
-        if (err < 0) {
-            if (err == GNUTLS_E_INTERRUPTED)
-                goto tlsreread;
-            error (in_open ? NULL : conn,
-                   VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
-            return -1;
-        }
-        if (err == 0) {
-            error (in_open ? NULL : conn,
-                   VIR_ERR_RPC, _("socket closed unexpectedly"));
-            return -1;
-        }
-    } else {
-    reread:
-        err = recv (priv->sock, bytes, len, 0);
-        if (err == -1) {
-            if (errno == EINTR)
-                goto reread;
-            error (in_open ? NULL : conn,
-                   VIR_ERR_SYSTEM_ERROR, strerror (errno));
-            return -1;
-        }
-        if (err == 0) {
-            error (in_open ? NULL : conn,
-                   VIR_ERR_RPC, _("socket closed unexpectedly"));
-            return -1;
-        }
-    }
-
-    return err;
-}
-
-static int
-really_read_plain (virConnectPtr conn, struct private_data *priv,
-                   int in_open /* if we are in virConnectOpen */,
-                   char *bytes, int len)
-{
-    do {
-        int ret = really_read_buf (conn, priv, in_open, bytes, len);
-        if (ret < 0)
-            return -1;
-
-        len -= ret;
-        bytes += ret;
-    } while (len > 0);
-
-    return 0;
-}
-
-#if HAVE_SASL
-static int
-really_read_sasl (virConnectPtr conn, struct private_data *priv,
-                  int in_open /* if we are in virConnectOpen */,
-                  char *bytes, int len)
-{
-    do {
-        int want, got;
-        if (priv->saslDecoded == NULL) {
-            char encoded[8192];
-            int encodedLen = sizeof(encoded);
-            int err, ret;
-            ret = really_read_buf (conn, priv, in_open, encoded, encodedLen);
-            if (ret < 0)
-                return -1;
-
-            err = sasl_decode(priv->saslconn, encoded, ret,
-                              &priv->saslDecoded, &priv->saslDecodedLength);
-        }
-
-        got = priv->saslDecodedLength - priv->saslDecodedOffset;
-        want = len;
-        if (want > got)
-            want = got;
-
-        memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want);
-        priv->saslDecodedOffset += want;
-        if (priv->saslDecodedOffset == priv->saslDecodedLength) {
-            priv->saslDecoded = NULL;
-            priv->saslDecodedOffset = priv->saslDecodedLength = 0;
-        }
-        bytes += want;
-        len -= want;
-    } while (len > 0);
-
-    return 0;
-}
-#endif
-
-static int
-really_read (virConnectPtr conn, struct private_data *priv,
-             int in_open /* if we are in virConnectOpen */,
-             char *bytes, int len)
-{
-#if HAVE_SASL
-    if (priv->saslconn)
-        return really_read_sasl (conn, priv, in_open, bytes, len);
-    else
-#endif
-        return really_read_plain (conn, priv, in_open, bytes, len);
-}
+
+    if (rv < 0) {
+        VIR_FREE(thiscall);
+        return -1;
+    }
+
+cleanup:
+    DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+    if (thiscall->mode == REMOTE_MODE_ERROR) {
+        /* See if caller asked us to keep quiet about missing RPCs
+         * eg for interop with older servers */
+        if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
+            thiscall->err.domain == VIR_FROM_REMOTE &&
+            thiscall->err.code == VIR_ERR_RPC &&
+            thiscall->err.level == VIR_ERR_ERROR &&
+            STRPREFIX(*thiscall->err.message, "unknown procedure")) {
+            rv = -2;
+        } else {
+            server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+                          &thiscall->err);
+            rv = -1;
+        }
+    } else {
+        rv = 0;
+    }
+    VIR_FREE(thiscall);
+    return rv;
+}
+
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static virDomainEventPtr
+remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
+{
+    remote_domain_event_ret ret;
+    virDomainPtr dom;
+    virDomainEventPtr event = NULL;
+    memset (&ret, 0, sizeof ret);
+
+    /* unmarshall parameters, and process it*/
+    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+        error (conn, VIR_ERR_RPC,
+               _("remoteDomainProcessEvent: unmarshalling ret"));
+        return NULL;
+    }
+
+    dom = get_nonnull_domain(conn,ret.dom);
+    if (!dom)
+        return NULL;
+
+    event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
+
+    virDomainFree(dom);
+    return event;
+}
+
+static void
+remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+    struct private_data *priv = conn->privateData;
+    virDomainEventPtr event;
+
+    event = remoteDomainReadEvent(conn, xdr);
+    if (!event)
+        return;
+
+    if (virDomainEventQueuePush(priv->domainEvents,
+                                event) < 0) {
+        DEBUG0("Error adding event to queue");
+        virDomainEventFree(event);
+    }
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void
+remoteDomainEventFired(int watch,
+                       int fd,
+                       int event,
+                       void *opaque)
+{
+    virConnectPtr        conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    remoteDriverLock(priv);
+
+    /* This should be impossible, but it doesn't hurt to check */
+    if (priv->waitDispatch)
+        goto done;
+
+    DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
+
+    if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+         DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+               "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+         virEventRemoveHandle(watch);
+         priv->watch = -1;
+         goto done;
+    }
+
+    if (fd != priv->sock) {
+        virEventRemoveHandle(watch);
+        priv->watch = -1;
+        goto done;
+    }
+
+    if (processCallRecv(conn, priv, 0) < 0)
+        DEBUG0("Something went wrong during async message processing");
+
+done:
+    remoteDriverUnlock(priv);
+}
+
+void
+remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+    virConnectPtr conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    remoteDriverLock(priv);
+
+    virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
+                                virDomainEventDispatchDefaultFunc, NULL);
+    virEventUpdateTimeout(priv->eventFlushTimer, -1);
+
+    remoteDriverUnlock(priv);
+}
+
 
 /* For errors internal to this library. */
 static void
@@ -6267,161 +6840,3 @@ remoteRegister (void)
     return 0;
 }
 
-/**
- * remoteDomainReadEvent
- *
- * Read the event data off the wire
- */
-static virDomainEventPtr
-remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
-{
-    remote_domain_event_ret ret;
-    virDomainPtr dom;
-    virDomainEventPtr event = NULL;
-    memset (&ret, 0, sizeof ret);
-
-    /* unmarshall parameters, and process it*/
-    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
-        error (conn, VIR_ERR_RPC,
-               _("remoteDomainProcessEvent: unmarshalling ret"));
-        return NULL;
-    }
-
-    dom = get_nonnull_domain(conn,ret.dom);
-    if (!dom)
-        return NULL;
-
-    event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
-
-    virDomainFree(dom);
-    return event;
-}
-
-static void
-remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
-{
-    struct private_data *priv = conn->privateData;
-    virDomainEventPtr event;
-
-    event = remoteDomainReadEvent(conn, xdr);
-    if (!event)
-        return;
-
-    DEBUG0("Calling domain event callbacks (no queue)");
-    virDomainEventDispatch(event, priv->callbackList,
-                           virDomainEventDispatchDefaultFunc, NULL);
-    virDomainEventFree(event);
-}
-
-static void
-remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
-{
-    struct private_data *priv = conn->privateData;
-    virDomainEventPtr event;
-
-    event = remoteDomainReadEvent(conn, xdr);
-    if (!event)
-        return;
-
-    if (virDomainEventQueuePush(priv->domainEvents,
-                                event) < 0) {
-        DEBUG0("Error adding event to queue");
-        virDomainEventFree(event);
-    }
-}
-
-/** remoteDomainEventFired:
- *
- * The callback for monitoring the remote socket
- * for event data
- */
-void
-remoteDomainEventFired(int watch,
-                       int fd,
-                       int event,
-                       void *opaque)
-{
-    char buffer[REMOTE_MESSAGE_MAX];
-    char buffer2[4];
-    struct remote_message_header hdr;
-    XDR xdr;
-    int len;
-
-    virConnectPtr        conn = opaque;
-    struct private_data *priv = conn->privateData;
-
-    remoteDriverLock(priv);
-
-    DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
-
-    if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
-         DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
-               "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
-         virEventRemoveHandle(watch);
-         goto done;
-    }
-
-    if (fd != priv->sock) {
-        virEventRemoveHandle(watch);
-        goto done;
-    }
-
-    /* Read and deserialise length word. */
-    if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
-        goto done;
-
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        goto done;
-    }
-    xdr_destroy (&xdr);
-
-    /* Length includes length word - adjust to real length to read. */
-    len -= 4;
-
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
-        error (conn, VIR_ERR_RPC, _("packet received from server too large"));
-        goto done;
-    }
-
-    /* Read reply header and what follows (either a ret or an error). */
-    if (really_read (conn, priv, 0, buffer, len) == -1) {
-        error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
-        goto done;
-    }
-
-    /* Deserialise reply header. */
-    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
-        goto done;
-    }
-
-    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
-        hdr.direction == REMOTE_MESSAGE) {
-        DEBUG0("Encountered an async event");
-        remoteDomainProcessEvent(conn, &xdr);
-    } else {
-        DEBUG0("invalid proc in event firing");
-        error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
-    }
-
-done:
-    remoteDriverUnlock(priv);
-}
-
-void
-remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
-{
-    virConnectPtr conn = opaque;
-    struct private_data *priv = conn->privateData;
-
-    remoteDriverLock(priv);
-
-    virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
-                                virDomainEventDispatchDefaultFunc, NULL);
-    virEventUpdateTimeout(priv->eventFlushTimer, -1);
-
-    remoteDriverUnlock(priv);
-}
diff --git a/src/util.c b/src/util.c
--- a/src/util.c
+++ b/src/util.c
@@ -34,6 +34,7 @@
 #include <poll.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/ioctl.h>
 #if HAVE_SYS_WAIT_H
 #include <sys/wait.h>
 #endif
@@ -155,8 +156,28 @@ virArgvToString(const char *const *argv)
     return ret;
 }
 
+int virSetNonBlock(int fd) {
+#ifndef WIN32
+    int flags;
+    if ((flags = fcntl(fd, F_GETFL)) < 0)
+        return -1;
+    flags |= O_NONBLOCK;
+    if ((fcntl(fd, F_SETFL, flags)) < 0)
+        return -1;
+#else
+    unsigned long flag = 1;
 
-#ifndef __MINGW32__
+    /* This is actually Gnulib's replacement rpl_ioctl function.
+     * We can't call ioctlsocket directly in any case.
+     */
+    if (ioctl (fd, FIONBIO, (void *) &flag) == -1)
+        return -1;
+#endif
+    return 0;
+}
+
+
+#ifndef WIN32
 
 static int virSetCloseExec(int fd) {
     int flags;
@@ -168,16 +189,6 @@ static int virSetCloseExec(int fd) {
     return 0;
 }
 
-static int virSetNonBlock(int fd) {
-    int flags;
-    if ((flags = fcntl(fd, F_GETFL)) < 0)
-        return -1;
-    flags |= O_NONBLOCK;
-    if ((fcntl(fd, F_SETFL, flags)) < 0)
-        return -1;
-    return 0;
-}
-
 static int
 __virExec(virConnectPtr conn,
           const char *const*argv,
diff --git a/src/util.h b/src/util.h
--- a/src/util.h
+++ b/src/util.h
@@ -38,6 +38,8 @@ enum {
     VIR_EXEC_DAEMON = (1 << 1),
 };
 
+int virSetNonBlock(int fd);
+
 int virExec(virConnectPtr conn,
             const char *const*argv,
             const char *const*envp,

-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]