[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [libvirt] PATCH: 17/25: Concurrent client dispatch in libvirtd



On Tue, Jan 20, 2009 at 10:27:56AM +0000, Daniel P. Berrange wrote:
> > 
> > The above is dead code, since the condition can never be true.
> > It should be testing "ret < 0", not ret == -1.
> > Also, the 2nd "&&" should be "||".
> > 
> >            if (ret < 0 && (ret == GNUTLS_E_AGAIN ||
> >                            ret == GNUTLS_E_INTERRUPTED))
> >                return 0;
> 
> Yes, really not sure why I change it like this. Clearly wrong
> 
> > >          if (encodedLen < 0)
> > >              return -1;
> > >
> > > -        sasl_decode(client->saslconn, encoded, encodedLen,
> > > -                    &client->saslDecoded, &client->saslDecodedLength);
> > > +        ret = sasl_decode(client->saslconn, encoded, encodedLen,
> > > +                          &client->saslDecoded, &client->saslDecodedLength);
> > > +        if (ret != SASL_OK)
> > > +            return -1;
> > 
> > If this ever fails, it's sure be nice to log why,
> > but I don't see a strerror analog for SASL_* values.
> 
> Actually there are two options sasl_errstring / sasl_errdetail
> both giving suitable output.
> 
> > > -        if (qemudClientRead(server, client) < 0)
> > > -            return; /* Error, or blocking */
> > > +        xdrmem_create(&x, client->rx->buffer, client->rx->bufferLength, XDR_DECODE);
> > >
> > > -        if (client->bufferOffset < client->bufferLength)
> > > -            return; /* Not read enough */
> > > -
> > > -        xdrmem_create(&x, client->buffer, client->bufferLength, XDR_DECODE);
> > > -
> > > -        if (!xdr_u_int(&x, &len)) {
> > > +        if (!xdr_int(&x, &len)) {
> > >              xdr_destroy (&x);
> > >              DEBUG0("Failed to decode packet length");
> > > -            qemudDispatchClientFailure(server, client);
> > > +            qemudDispatchClientFailure(client);
> > >              return;
> > >          }
> > >          xdr_destroy (&x);
> > >
> > > +        /* Length includes the size of the length word itself */
> > > +        len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
> > > +
> > >          if (len > REMOTE_MESSAGE_MAX) {
> > >              DEBUG("Packet length %u too large", len);
> > > -            qemudDispatchClientFailure(server, client);
> > > +            qemudDispatchClientFailure(client);
> > >              return;
> > >          }
> > >
> > >          /* Length include length of the length field itself, so
> > >           * check minimum size requirements */
> > > -        if (len <= REMOTE_MESSAGE_HEADER_XDR_LEN) {
> > > +        if (len <= 0) {
> > >              DEBUG("Packet length %u too small", len);
> > 
> > "len" may be negative here, so printing with %u will give a
> > misleading diagnostic.
> > Better use the original value, "len + REMOTE_MESSAGE_HEADER_XDR_LEN",
> > which is more likely to be non-negative.  Might as well use %d,
> > in case even the original value is negative.
> 
> SOmething odd here - I don't think I should have been changing it
> to signed in the first place. The original code used unsigned int
> and this is on the wire. Oddly the original client code used a 
> signed int, so we had a mis-match of client & server. I think it
> is better to fix the client though. So I'll re-visit this whole
> chunk

I have put it back to 'xdr_u_int' and fixed the client code to
match this. There's no compatability problem because neither
old or new code was sending length field thats long enough to
differ in signed vs unsigned. I also made the range checks moire
paranoid.

> 
> > >
> > > -    if (client->fd != fd)
> > > -        return;
> > > +    if (events & (VIR_EVENT_HANDLE_WRITABLE |
> > > +                  VIR_EVENT_HANDLE_READABLE)) {
> > > +        if (client->handshake) {
> > > +            qemudDispatchClientHandshake(server, client);
> > > +        } else {
> > > +            if (events & VIR_EVENT_HANDLE_WRITABLE)
> > > +                qemudDispatchClientWrite(server, client);
> > > +            if (events == VIR_EVENT_HANDLE_READABLE)
> > > +                qemudDispatchClientRead(server, client);
> > 
> > Why test with & to write, and then with "==" to read?
> > That makes it so we don't read when we've just written
> > (i.e., if both read and write bits were set).
> 
> Bug, it should be '&' and not '=='.
> 
> > > diff --git a/qemud/qemud.h b/qemud/qemud.h
> > ...
> > > +struct qemud_client_message {
> > ...
> > > +    int nrequests;
> > 
> > Logically, this should be an unsigned type.
> > But that means max_clients should be, too,
> > since they're compared, but max_clients comes
> > from the config file, which currently uses "int" (via GET_CONF_INT).
> > Maybe we need GET_CONF_UINT?  I wonder if a bogus config "int"
> > value of 2^32 or 2^64 maps back to 0.
> 
> Checking for 2^32/64 is not really helpful in this context, because both
> are totally inappropriate for nrequests - we should check for a more
> reasonable lower limit on nrequests, perhaps in range 1 -> 100.

I've left nrequests as a signed int, because its compared to max_Clients
which is also signed & it'd be better to change them all at once, rather
than just nrequests.


Finally I've also made use of ssize_t as suggested, this also
made me notice one place where we didn't propagate the EAGAIN
case of ret=0 correctly.

Daniel

diff --git a/qemud/libvirtd.aug b/qemud/libvirtd.aug
--- a/qemud/libvirtd.aug
+++ b/qemud/libvirtd.aug
@@ -53,6 +53,8 @@ module Libvirtd =
    let processing_entry = int_entry "min_workers"
                         | int_entry "max_workers"
                         | int_entry "max_clients"
+                        | int_entry "max_requests"
+                        | int_entry "max_client_requests"
 
    let logging_entry = int_entry "log_level"
                      | str_entry "log_filters"
diff --git a/qemud/libvirtd.conf b/qemud/libvirtd.conf
--- a/qemud/libvirtd.conf
+++ b/qemud/libvirtd.conf
@@ -247,6 +247,22 @@
 #min_workers = 5
 #max_workers = 20
 
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+#
+# XXX this isn't actually enforced yet, only the per-client
+# limit is used so far
+#max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+#max_client_requests = 5
+
 #################################################################
 #
 # Logging controls
diff --git a/qemud/qemud.c b/qemud/qemud.c
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -138,6 +138,11 @@ static int min_workers = 5;
 static int max_workers = 20;
 static int max_clients = 20;
 
+/* Total number of 'in-process' RPC calls allowed across all clients */
+static int max_requests = 20;
+/* Total number of 'in-process' RPC calls allowed by a single client*/
+static int max_client_requests = 5;
+
 #define DH_BITS 1024
 
 static sig_atomic_t sig_errors = 0;
@@ -162,9 +167,37 @@ static void sig_handler(int sig, siginfo
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudRegisterClientEvent(struct qemud_server *server,
-                                    struct qemud_client *client,
-                                    int removeFirst);
+
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+                            struct qemud_client_message *msg)
+{
+    struct qemud_client_message *tmp = *queue;
+
+    if (tmp) {
+        while (tmp->next)
+            tmp = tmp->next;
+        tmp->next = msg;
+    } else {
+        *queue = msg;
+    }
+}
+
+static struct qemud_client_message *
+qemudClientMessageQueueServe(struct qemud_client_message **queue)
+{
+    struct qemud_client_message *tmp = *queue;
+
+    if (tmp) {
+        *queue = tmp->next;
+        tmp->next = NULL;
+    } else {
+        *queue = NULL;
+    }
+
+    return tmp;
+}
 
 static int
 remoteCheckCertFile(const char *type, const char *file)
@@ -1042,6 +1075,8 @@ remoteCheckCertificate (gnutls_session_t
 static int
 remoteCheckAccess (struct qemud_client *client)
 {
+    struct qemud_client_message *confirm;
+
     /* Verify client certificate. */
     if (remoteCheckCertificate (client->tlssession) == -1) {
         VIR_ERROR0(_("remoteCheckCertificate: "
@@ -1051,14 +1086,25 @@ remoteCheckAccess (struct qemud_client *
                           "is set so the bad certificate is ignored"));
     }
 
+    if (client->tx) {
+        VIR_INFO("%s",
+                 _("client had unexpected data pending tx after access check"));
+        return -1;
+    }
+
+    if (VIR_ALLOC(confirm) < 0)
+        return -1;
+
     /* Checks have succeeded.  Write a '\1' byte back to the client to
      * indicate this (otherwise the socket is abruptly closed).
      * (NB. The '\1' byte is sent in an encrypted record).
      */
-    client->bufferLength = 1;
-    client->bufferOffset = 0;
-    client->buffer[0] = '\1';
-    client->mode = QEMUD_MODE_TX_PACKET;
+    confirm->async = 1;
+    confirm->bufferLength = 1;
+    confirm->bufferOffset = 0;
+    confirm->buffer[0] = '\1';
+
+    client->tx = confirm;
     return 0;
 }
 
@@ -1084,6 +1130,7 @@ int qemudGetSocketIdentity(int fd, uid_t
 }
 #endif
 
+
 static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket *sock) {
     int fd;
     struct sockaddr_storage addr;
@@ -1099,7 +1146,7 @@ static int qemudDispatchServer(struct qe
     }
 
     if (server->nclients >= max_clients) {
-        VIR_ERROR0(_("Too many active clients, dropping connection"));
+        VIR_ERROR(_("Too many active clients (%d), dropping connection"), max_clients);
         close(fd);
         return -1;
     }
@@ -1137,6 +1184,12 @@ static int qemudDispatchServer(struct qe
     client->addrlen = addrlen;
     client->server = server;
 
+    /* Prepare one for packet receive */
+    if (VIR_ALLOC(client->rx) < 0)
+        goto cleanup;
+    client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+
 #if HAVE_POLKIT
     /* Only do policy checks for non-root - allow root user
        through with no checks, as a fail-safe - root can easily
@@ -1158,9 +1211,7 @@ static int qemudDispatchServer(struct qe
 #endif
 
     if (client->type != QEMUD_SOCK_TYPE_TLS) {
-        client->mode = QEMUD_MODE_RX_HEADER;
-        client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
-
+        /* Plain socket, so prepare to read first message */
         if (qemudRegisterClientEvent (server, client, 0) < 0)
             goto cleanup;
     } else {
@@ -1180,12 +1231,12 @@ static int qemudDispatchServer(struct qe
             if (remoteCheckAccess (client) == -1)
                 goto cleanup;
 
+            /* Handshake & cert check OK,  so prepare to read first message */
             if (qemudRegisterClientEvent(server, client, 0) < 0)
                 goto cleanup;
         } else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
-            /* Most likely. */
-            client->mode = QEMUD_MODE_TLS_HANDSHAKE;
-            client->bufferLength = -1;
+            /* Most likely, need to do more handshake data */
+            client->handshake = 1;
 
             if (qemudRegisterClientEvent (server, client, 0) < 0)
                 goto cleanup;
@@ -1204,7 +1255,8 @@ static int qemudDispatchServer(struct qe
     if (client &&
         client->tlssession) gnutls_deinit (client->tlssession);
     close (fd);
-    free (client);
+    VIR_FREE(client->rx);
+    VIR_FREE(client);
     return -1;
 }
 
@@ -1216,8 +1268,7 @@ static int qemudDispatchServer(struct qe
  * We keep the libvirt connection open until any async
  * jobs have finished, then clean it up elsehwere
  */
-static void qemudDispatchClientFailure(struct qemud_server *server ATTRIBUTE_UNUSED,
-                                       struct qemud_client *client) {
+void qemudDispatchClientFailure(struct qemud_client *client) {
     virEventRemoveHandleImpl(client->watch);
 
     /* Deregister event delivery callback */
@@ -1242,7 +1293,7 @@ static struct qemud_client *qemudPending
     int i;
     for (i = 0 ; i < server->nclients ; i++) {
         virMutexLock(&server->clients[i]->lock);
-        if (server->clients[i]->mode == QEMUD_MODE_WAIT_DISPATCH) {
+        if (server->clients[i]->dx) {
             /* Delibrately don't unlock client - caller wants the lock */
             return server->clients[i];
         }
@@ -1256,8 +1307,9 @@ static void *qemudWorker(void *data)
     struct qemud_server *server = data;
 
     while (1) {
-        struct qemud_client *client;
-        int len;
+        struct qemud_client *client = NULL;
+        struct qemud_client_message *reply;
+
         virMutexLock(&server->lock);
         while ((client = qemudPendingJob(server)) == NULL) {
             if (virCondWait(&server->job, &server->lock) < 0) {
@@ -1268,55 +1320,75 @@ static void *qemudWorker(void *data)
         virMutexUnlock(&server->lock);
 
         /* We own a locked client now... */
-        client->mode = QEMUD_MODE_IN_DISPATCH;
         client->refs++;
 
-        if ((len = remoteDispatchClientRequest (server, client)) == 0)
-            qemudDispatchClientFailure(server, client);
+        /* Remove our message from dispatch queue while we use it */
+        reply = qemudClientMessageQueueServe(&client->dx);
 
-        /* Set up the output buffer. */
-        client->mode = QEMUD_MODE_TX_PACKET;
-        client->bufferLength = len;
-        client->bufferOffset = 0;
+        /* This function drops the lock during dispatch,
+         * and re-acquires it before returning */
+        if (remoteDispatchClientRequest (server, client, reply) < 0) {
+            VIR_FREE(reply);
+            qemudDispatchClientFailure(client);
+            client->refs--;
+            virMutexUnlock(&client->lock);
+            continue;
+        }
+
+        /* Put reply on end of tx queue to send out  */
+        qemudClientMessageQueuePush(&client->tx, reply);
 
         if (qemudRegisterClientEvent(server, client, 1) < 0)
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
 
         client->refs--;
         virMutexUnlock(&client->lock);
-        virMutexUnlock(&server->lock);
     }
 }
 
 
-static int qemudClientReadBuf(struct qemud_server *server,
-                              struct qemud_client *client,
-                              char *data, unsigned len) {
-    int ret;
+/*
+ * Read data into buffer using wire decoding (plain or TLS)
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientReadBuf(struct qemud_client *client,
+                                  char *data, ssize_t len) {
+    ssize_t ret;
+
+    if (len < 0) {
+        VIR_ERROR(_("unexpected negative length request %d"), len);
+        qemudDispatchClientFailure(client);
+        return -1;
+    }
 
     /*qemudDebug ("qemudClientRead: len = %d", len);*/
 
     if (!client->tlssession) {
-        if ((ret = read (client->fd, data, len)) <= 0) {
-            if (ret == 0 || errno != EAGAIN) {
-                if (ret != 0)
-                    VIR_ERROR(_("read: %s"), strerror (errno));
-                qemudDispatchClientFailure(server, client);
-            }
+        ret = read (client->fd, data, len);
+        if (ret == -1 && (errno == EAGAIN ||
+                          errno == EINTR))
+            return 0;
+        if (ret <= 0) {
+            if (ret != 0)
+                VIR_ERROR(_("read: %s"), strerror (errno));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     } else {
         ret = gnutls_record_recv (client->tlssession, data, len);
-        if (qemudRegisterClientEvent (server, client, 1) < 0)
-            qemudDispatchClientFailure (server, client);
-        else if (ret <= 0) {
-            if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
-                             ret != GNUTLS_E_INTERRUPTED)) {
-                if (ret != 0)
-                    VIR_ERROR(_("gnutls_record_recv: %s"),
-                              gnutls_strerror (ret));
-                qemudDispatchClientFailure (server, client);
-            }
+
+        if (ret < 0 && (ret == GNUTLS_E_AGAIN ||
+                        ret == GNUTLS_E_INTERRUPTED))
+            return 0;
+        if (ret <= 0) {
+            if (ret != 0)
+                VIR_ERROR(_("gnutls_record_recv: %s"),
+                          gnutls_strerror (ret));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     }
@@ -1324,22 +1396,37 @@ static int qemudClientReadBuf(struct qem
     return ret;
 }
 
-static int qemudClientReadPlain(struct qemud_server *server,
-                                struct qemud_client *client) {
-    int ret;
-    ret = qemudClientReadBuf(server, client,
-                             client->buffer + client->bufferOffset,
-                             client->bufferLength - client->bufferOffset);
-    if (ret < 0)
-        return ret;
-    client->bufferOffset += ret;
-    return 0;
+/*
+ * Read data into buffer without decoding
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientReadPlain(struct qemud_client *client) {
+    ssize_t ret;
+    ret = qemudClientReadBuf(client,
+                             client->rx->buffer + client->rx->bufferOffset,
+                             client->rx->bufferLength - client->rx->bufferOffset);
+    if (ret <= 0)
+        return ret; /* -1 error, 0 eagain */
+
+    client->rx->bufferOffset += ret;
+    return ret;
 }
 
 #if HAVE_SASL
-static int qemudClientReadSASL(struct qemud_server *server,
-                               struct qemud_client *client) {
-    int got, want;
+/*
+ * Read data into buffer decoding with SASL
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientReadSASL(struct qemud_client *client) {
+    ssize_t got, want;
 
     /* We're doing a SSF data read, so now its times to ensure
      * future writes are under SSF too.
@@ -1350,166 +1437,176 @@ static int qemudClientReadSASL(struct qe
 
     /* Need to read some more data off the wire */
     if (client->saslDecoded == NULL) {
+        int ret;
         char encoded[8192];
-        int encodedLen = sizeof(encoded);
-        encodedLen = qemudClientReadBuf(server, client, encoded, encodedLen);
+        ssize_t encodedLen = sizeof(encoded);
+        encodedLen = qemudClientReadBuf(client, encoded, encodedLen);
 
-        if (encodedLen < 0)
+        if (encodedLen <= 0)
+            return encodedLen;
+
+        ret = sasl_decode(client->saslconn, encoded, encodedLen,
+                          &client->saslDecoded, &client->saslDecodedLength);
+        if (ret != SASL_OK) {
+            VIR_ERROR(_("failed to decode SASL data %s"),
+                      sasl_errstring(ret, NULL, NULL));
+            qemudDispatchClientFailure(client);
             return -1;
-
-        sasl_decode(client->saslconn, encoded, encodedLen,
-                    &client->saslDecoded, &client->saslDecodedLength);
+        }
 
         client->saslDecodedOffset = 0;
     }
 
     /* Some buffered decoded data to return now */
     got = client->saslDecodedLength - client->saslDecodedOffset;
-    want = client->bufferLength - client->bufferOffset;
+    want = client->rx->bufferLength - client->rx->bufferOffset;
 
     if (want > got)
         want = got;
 
-    memcpy(client->buffer + client->bufferOffset,
+    memcpy(client->rx->buffer + client->rx->bufferOffset,
            client->saslDecoded + client->saslDecodedOffset, want);
     client->saslDecodedOffset += want;
-    client->bufferOffset += want;
+    client->rx->bufferOffset += want;
 
     if (client->saslDecodedOffset == client->saslDecodedLength) {
         client->saslDecoded = NULL;
         client->saslDecodedOffset = client->saslDecodedLength = 0;
     }
 
-    return 0;
+    return want;
 }
 #endif
 
-static int qemudClientRead(struct qemud_server *server,
-                           struct qemud_client *client) {
+/*
+ * Read as much data off wire as possible till we fill our
+ * buffer, or would block on I/O
+ */
+static ssize_t qemudClientRead(struct qemud_client *client) {
 #if HAVE_SASL
     if (client->saslSSF & QEMUD_SASL_SSF_READ)
-        return qemudClientReadSASL(server, client);
+        return qemudClientReadSASL(client);
     else
 #endif
-        return qemudClientReadPlain(server, client);
+        return qemudClientReadPlain(client);
 }
 
 
-static void qemudDispatchClientRead(struct qemud_server *server, struct qemud_client *client) {
-    unsigned int len;
+/*
+ * Read data until we get a complete message to process
+ */
+static void qemudDispatchClientRead(struct qemud_server *server,
+                                    struct qemud_client *client) {
     /*qemudDebug ("qemudDispatchClientRead: mode = %d", client->mode);*/
 
-    switch (client->mode) {
-    case QEMUD_MODE_RX_HEADER: {
+readmore:
+    if (qemudClientRead(client) < 0)
+        return; /* Error */
+
+    if (client->rx->bufferOffset < client->rx->bufferLength)
+        return; /* Still not read enough */
+
+    /* Either done with length word header */
+    if (client->rx->bufferLength == REMOTE_MESSAGE_HEADER_XDR_LEN) {
+        unsigned int len;
         XDR x;
 
-        if (qemudClientRead(server, client) < 0)
-            return; /* Error, or blocking */
-
-        if (client->bufferOffset < client->bufferLength)
-            return; /* Not read enough */
-
-        xdrmem_create(&x, client->buffer, client->bufferLength, XDR_DECODE);
+        xdrmem_create(&x, client->rx->buffer, client->rx->bufferLength, XDR_DECODE);
 
         if (!xdr_u_int(&x, &len)) {
             xdr_destroy (&x);
             DEBUG0("Failed to decode packet length");
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
         xdr_destroy (&x);
 
-        if (len > REMOTE_MESSAGE_MAX) {
-            DEBUG("Packet length %u too large", len);
-            qemudDispatchClientFailure(server, client);
+        if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) {
+            DEBUG("Packet length %u too small", len);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        /* Length include length of the length field itself, so
-         * check minimum size requirements */
-        if (len <= REMOTE_MESSAGE_HEADER_XDR_LEN) {
-            DEBUG("Packet length %u too small", len);
-            qemudDispatchClientFailure(server, client);
+        /* Length includes the size of the length word itself */
+        len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+        if (len > REMOTE_MESSAGE_MAX) {
+            DEBUG("Packet length %u too large", len);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        client->mode = QEMUD_MODE_RX_PAYLOAD;
-        client->bufferLength = len - REMOTE_MESSAGE_HEADER_XDR_LEN;
-        client->bufferOffset = 0;
+        /* Prepare to read rest of message */
+        client->rx->bufferLength += len;
 
         if (qemudRegisterClientEvent(server, client, 1) < 0) {
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        /* Fall through */
-    }
+        /* Try and read payload immediately instead of going back
+           into poll() because chances are the data is already
+           waiting for us */
+        goto readmore;
+    } else {
+        /* Move completed message to the end of the dispatch queue */
+        qemudClientMessageQueuePush(&client->dx, client->rx);
+        client->rx = NULL;
+        client->nrequests++;
 
-    case QEMUD_MODE_RX_PAYLOAD: {
-        if (qemudClientRead(server, client) < 0)
-            return; /* Error, or blocking */
+        /* Possibly need to create another receive buffer */
+        if ((client->nrequests < max_client_requests &&
+             VIR_ALLOC(client->rx) < 0)) {
+            qemudDispatchClientFailure(client);
+        } else {
+            if (client->rx)
+                client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
 
-        if (client->bufferOffset < client->bufferLength)
-            return; /* Not read enough */
-
-        client->mode = QEMUD_MODE_WAIT_DISPATCH;
-        if (qemudRegisterClientEvent(server, client, 1) < 0)
-            qemudDispatchClientFailure(server, client);
-
-        virCondSignal(&server->job);
-
-        break;
-    }
-
-    case QEMUD_MODE_TLS_HANDSHAKE: {
-        int ret;
-
-        /* Continue the handshake. */
-        ret = gnutls_handshake (client->tlssession);
-        if (ret == 0) {
-            /* Finished.  Next step is to check the certificate. */
-            if (remoteCheckAccess (client) == -1)
-                qemudDispatchClientFailure (server, client);
-            else if (qemudRegisterClientEvent (server, client, 1) < 0)
-                qemudDispatchClientFailure (server, client);
-        } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
-            VIR_ERROR(_("TLS handshake failed: %s"),
-                      gnutls_strerror (ret));
-            qemudDispatchClientFailure (server, client);
-        } else {
-            if (qemudRegisterClientEvent (server ,client, 1) < 0)
-                qemudDispatchClientFailure (server, client);
+            if (qemudRegisterClientEvent(server, client, 1) < 0)
+                qemudDispatchClientFailure(client);
+            else
+                /* Tell one of the workers to get on with it... */
+                virCondSignal(&server->job);
         }
-
-        break;
-    }
-
-    default:
-        DEBUG("Got unexpected data read while in %d mode", client->mode);
-        qemudDispatchClientFailure(server, client);
     }
 }
 
 
-static int qemudClientWriteBuf(struct qemud_server *server,
-                               struct qemud_client *client,
-                               const char *data, int len) {
-    int ret;
+/*
+ * Send a chunk of data using wire encoding (plain or TLS)
+ *
+ * Returns:
+ *   -1 on error
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientWriteBuf(struct qemud_client *client,
+                                   const char *data, ssize_t len) {
+    ssize_t ret;
+
+    if (len < 0) {
+        VIR_ERROR(_("unexpected negative length request %d"), len);
+        qemudDispatchClientFailure(client);
+        return -1;
+    }
+
     if (!client->tlssession) {
-        if ((ret = safewrite(client->fd, data, len)) == -1) {
+        if ((ret = write(client->fd, data, len)) == -1) {
+            if (errno == EAGAIN || errno == EINTR)
+                return 0;
             VIR_ERROR(_("write: %s"), strerror (errno));
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return -1;
         }
     } else {
         ret = gnutls_record_send (client->tlssession, data, len);
-        if (qemudRegisterClientEvent (server, client, 1) < 0)
-            qemudDispatchClientFailure (server, client);
-        else if (ret < 0) {
-            if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
-                VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
-                qemudDispatchClientFailure (server, client);
-            }
+        if (ret < 0) {
+            if (ret == GNUTLS_E_INTERRUPTED ||
+                ret == GNUTLS_E_AGAIN)
+                return 0;
+
+            VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     }
@@ -1517,42 +1614,62 @@ static int qemudClientWriteBuf(struct qe
 }
 
 
-static int qemudClientWritePlain(struct qemud_server *server,
-                                 struct qemud_client *client) {
-    int ret = qemudClientWriteBuf(server, client,
-                                  client->buffer + client->bufferOffset,
-                                  client->bufferLength - client->bufferOffset);
-    if (ret < 0)
-        return -1;
-    client->bufferOffset += ret;
-    return 0;
+/*
+ * Send client->tx using no encoding
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static int qemudClientWritePlain(struct qemud_client *client) {
+    int ret = qemudClientWriteBuf(client,
+                                  client->tx->buffer + client->tx->bufferOffset,
+                                  client->tx->bufferLength - client->tx->bufferOffset);
+    if (ret <= 0)
+        return ret; /* -1 error, 0 = egain */
+    client->tx->bufferOffset += ret;
+    return ret;
 }
 
 
 #if HAVE_SASL
-static int qemudClientWriteSASL(struct qemud_server *server,
-                                struct qemud_client *client) {
+/*
+ * Send client->tx using SASL encoding
+ *
+ * Returns:
+ *   -1 on error
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static int qemudClientWriteSASL(struct qemud_client *client) {
     int ret;
 
     /* Not got any pending encoded data, so we need to encode raw stuff */
     if (client->saslEncoded == NULL) {
-        int err;
-        err = sasl_encode(client->saslconn,
-                          client->buffer + client->bufferOffset,
-                          client->bufferLength - client->bufferOffset,
+        ret = sasl_encode(client->saslconn,
+                          client->tx->buffer + client->tx->bufferOffset,
+                          client->tx->bufferLength - client->tx->bufferOffset,
                           &client->saslEncoded,
                           &client->saslEncodedLength);
 
+        if (ret != SASL_OK) {
+            VIR_ERROR(_("failed to encode SASL data %s"),
+                      sasl_errstring(ret, NULL, NULL));
+            qemudDispatchClientFailure(client);
+            return -1;
+        }
+
         client->saslEncodedOffset = 0;
     }
 
     /* Send some of the encoded stuff out on the wire */
-    ret = qemudClientWriteBuf(server, client,
+    ret = qemudClientWriteBuf(client,
                               client->saslEncoded + client->saslEncodedOffset,
                               client->saslEncodedLength - client->saslEncodedOffset);
 
-    if (ret < 0)
-        return -1;
+    if (ret <= 0)
+        return ret; /* -1 error, 0 == egain */
 
     /* Note how much we sent */
     client->saslEncodedOffset += ret;
@@ -1561,77 +1678,106 @@ static int qemudClientWriteSASL(struct q
     if (client->saslEncodedOffset == client->saslEncodedLength) {
         client->saslEncoded = NULL;
         client->saslEncodedOffset = client->saslEncodedLength = 0;
-        client->bufferOffset = client->bufferLength;
+
+        /* Mark as complete, so caller detects completion */
+        client->tx->bufferOffset = client->tx->bufferLength;
     }
 
-    return 0;
+    return ret;
 }
 #endif
 
-static int qemudClientWrite(struct qemud_server *server,
-                            struct qemud_client *client) {
+/*
+ * Send as much data in the client->tx as possible
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientWrite(struct qemud_client *client) {
 #if HAVE_SASL
     if (client->saslSSF & QEMUD_SASL_SSF_WRITE)
-        return qemudClientWriteSASL(server, client);
+        return qemudClientWriteSASL(client);
     else
 #endif
-        return qemudClientWritePlain(server, client);
+        return qemudClientWritePlain(client);
 }
 
 
-void
+/*
+ * Process all queued client->tx messages until
+ * we would block on I/O
+ */
+static void
 qemudDispatchClientWrite(struct qemud_server *server,
                          struct qemud_client *client) {
-    switch (client->mode) {
-    case QEMUD_MODE_TX_PACKET: {
-        if (qemudClientWrite(server, client) < 0)
+    while (client->tx) {
+        ssize_t ret;
+
+        ret = qemudClientWrite(client);
+        if (ret < 0) {
+            qemudDispatchClientFailure(client);
             return;
+        }
+        if (ret == 0)
+            return; /* Would block on write EAGAIN */
 
-        if (client->bufferOffset == client->bufferLength) {
-            if (client->closing) {
-                qemudDispatchClientFailure (server, client);
+        if (client->tx->bufferOffset == client->tx->bufferLength) {
+            struct qemud_client_message *reply;
+
+            /* Get finished reply from head of tx queue */
+            reply = qemudClientMessageQueueServe(&client->tx);
+
+            /* If its not an async message, then we have
+             * just completed an RPC request */
+            if (!reply->async)
+                client->nrequests--;
+
+            /* Move record to end of 'rx' ist */
+            if (!client->rx &&
+                client->nrequests < max_client_requests) {
+                /* Reset message record for next RX attempt */
+                client->rx = reply;
+                client->rx->bufferOffset = 0;
+                client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
             } else {
-                /* Done writing, switch back to receive */
-                client->mode = QEMUD_MODE_RX_HEADER;
-                client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
-                client->bufferOffset = 0;
+                VIR_FREE(reply);
+            }
 
-                if (qemudRegisterClientEvent (server, client, 1) < 0)
-                    qemudDispatchClientFailure (server, client);
-            }
-        }
-        /* Still writing */
-        break;
-    }
-
-    case QEMUD_MODE_TLS_HANDSHAKE: {
-        int ret;
-
-        /* Continue the handshake. */
-        ret = gnutls_handshake (client->tlssession);
-        if (ret == 0) {
-            /* Finished.  Next step is to check the certificate. */
-            if (remoteCheckAccess (client) == -1)
-                qemudDispatchClientFailure (server, client);
-            else if (qemudRegisterClientEvent (server, client, 1))
-                qemudDispatchClientFailure (server, client);
-        } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
-            VIR_ERROR(_("TLS handshake failed: %s"), gnutls_strerror (ret));
-            qemudDispatchClientFailure (server, client);
-        } else {
-            if (qemudRegisterClientEvent (server, client, 1))
-                qemudDispatchClientFailure (server, client);
-        }
-
-        break;
-    }
-
-    default:
-        DEBUG("Got unexpected data write while in %d mode", client->mode);
-        qemudDispatchClientFailure(server, client);
+            if (client->closing ||
+                qemudRegisterClientEvent (server, client, 1) < 0)
+                 qemudDispatchClientFailure(client);
+         }
     }
 }
 
+static void
+qemudDispatchClientHandshake(struct qemud_server *server,
+                             struct qemud_client *client) {
+    int ret;
+    /* Continue the handshake. */
+    ret = gnutls_handshake (client->tlssession);
+    if (ret == 0) {
+        /* Finished.  Next step is to check the certificate. */
+        if (remoteCheckAccess (client) == -1)
+            qemudDispatchClientFailure(client);
+        else if (qemudRegisterClientEvent (server, client, 1))
+            qemudDispatchClientFailure(client);
+    } else if (ret == GNUTLS_E_AGAIN ||
+               ret == GNUTLS_E_INTERRUPTED) {
+        /* Carry on waiting for more handshake. Update
+           the events just in case handshake data flow
+           direction has changed */
+        if (qemudRegisterClientEvent (server, client, 1))
+            qemudDispatchClientFailure(client);
+    } else {
+        /* Fatal error in handshake */
+        VIR_ERROR(_("TLS handshake failed: %s"),
+                  gnutls_strerror (ret));
+        qemudDispatchClientFailure(client);
+    }
+}
 
 static void
 qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
@@ -1642,59 +1788,66 @@ qemudDispatchClientEvent(int watch, int 
     virMutexLock(&server->lock);
 
     for (i = 0 ; i < server->nclients ; i++) {
+        virMutexLock(&server->clients[i]->lock);
         if (server->clients[i]->watch == watch) {
             client = server->clients[i];
             break;
         }
+        virMutexUnlock(&server->clients[i]->lock);
     }
 
+    virMutexUnlock(&server->lock);
+
     if (!client) {
-        virMutexUnlock(&server->lock);
         return;
     }
 
-    virMutexLock(&client->lock);
-    virMutexUnlock(&server->lock);
+    if (client->fd != fd) {
+        virMutexUnlock(&client->lock);
+        return;
+    }
 
-    if (client->fd != fd)
-        return;
+    if (events & (VIR_EVENT_HANDLE_WRITABLE |
+                  VIR_EVENT_HANDLE_READABLE)) {
+        if (client->handshake) {
+            qemudDispatchClientHandshake(server, client);
+        } else {
+            if (events & VIR_EVENT_HANDLE_WRITABLE)
+                qemudDispatchClientWrite(server, client);
+            if (events & VIR_EVENT_HANDLE_READABLE)
+                qemudDispatchClientRead(server, client);
+        }
+    }
 
-    if (events == VIR_EVENT_HANDLE_WRITABLE)
-        qemudDispatchClientWrite(server, client);
-    else if (events == VIR_EVENT_HANDLE_READABLE)
-        qemudDispatchClientRead(server, client);
-    else
-        qemudDispatchClientFailure(server, client);
+    /* NB, will get HANGUP + READABLE at same time upon
+     * disconnect */
+    if (events & (VIR_EVENT_HANDLE_ERROR |
+                  VIR_EVENT_HANDLE_HANGUP))
+        qemudDispatchClientFailure(client);
+
     virMutexUnlock(&client->lock);
 }
 
-static int qemudRegisterClientEvent(struct qemud_server *server,
-                                    struct qemud_client *client,
-                                    int update) {
-    int mode;
-    switch (client->mode) {
-    case QEMUD_MODE_TLS_HANDSHAKE:
+int qemudRegisterClientEvent(struct qemud_server *server,
+                             struct qemud_client *client,
+                             int update) {
+    int mode = 0;
+
+    if (client->handshake) {
         if (gnutls_record_get_direction (client->tlssession) == 0)
-            mode = VIR_EVENT_HANDLE_READABLE;
+            mode |= VIR_EVENT_HANDLE_READABLE;
         else
-            mode = VIR_EVENT_HANDLE_WRITABLE;
-        break;
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
+    } else {
+        /* If there is a message on the rx queue then
+         * we're wanting more input */
+        if (client->rx)
+            mode |= VIR_EVENT_HANDLE_READABLE;
 
-    case QEMUD_MODE_RX_HEADER:
-    case QEMUD_MODE_RX_PAYLOAD:
-        mode = VIR_EVENT_HANDLE_READABLE;
-        break;
-
-    case QEMUD_MODE_TX_PACKET:
-        mode = VIR_EVENT_HANDLE_WRITABLE;
-        break;
-
-    case QEMUD_MODE_WAIT_DISPATCH:
-        mode = 0;
-        break;
-
-    default:
-        return -1;
+        /* If there are one or more messages to send back to client,
+           then monitor for writability on socket */
+        if (client->tx)
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
     }
 
     if (update) {
@@ -1760,6 +1913,29 @@ static void qemudInactiveTimer(int timer
     }
 }
 
+static void qemudFreeClient(struct qemud_client *client) {
+    while (client->rx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueueServe(&client->rx);
+        VIR_FREE(msg);
+    }
+    while (client->dx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueueServe(&client->dx);
+        VIR_FREE(msg);
+    }
+    while (client->tx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueueServe(&client->tx);
+        VIR_FREE(msg);
+    }
+
+    if (client->conn)
+        virConnectClose(client->conn);
+    virMutexDestroy(&client->lock);
+    VIR_FREE(client);
+}
+
 static int qemudRunLoop(struct qemud_server *server) {
     int timerid = -1;
     int ret = -1, i;
@@ -1796,8 +1972,11 @@ static int qemudRunLoop(struct qemud_ser
         }
 
         virMutexUnlock(&server->lock);
-        if (qemudOneLoop() < 0)
+        if (qemudOneLoop() < 0) {
+            virMutexLock(&server->lock);
+            DEBUG0("Loop iteration error, exiting\n");
             break;
+        }
         virMutexLock(&server->lock);
 
     reprocess:
@@ -1808,17 +1987,18 @@ static int qemudRunLoop(struct qemud_ser
                 && server->clients[i]->refs == 0;
             virMutexUnlock(&server->clients[i]->lock);
             if (inactive) {
-                if (server->clients[i]->conn)
-                    virConnectClose(server->clients[i]->conn);
-                virMutexDestroy(&server->clients[i]->lock);
-                VIR_FREE(server->clients[i]);
+                qemudFreeClient(server->clients[i]);
                 server->nclients--;
-                if (i < server->nclients) {
+                if (i < server->nclients)
                     memmove(server->clients + i,
                             server->clients + i + 1,
-                            server->nclients - i);
-                    goto reprocess;
+                            sizeof (*server->clients) * (server->nclients - i));
+
+                if (VIR_REALLOC_N(server->clients,
+                                  server->nclients) < 0) {
+                    ; /* ignore */
                 }
+                goto reprocess;
             }
         }
 
@@ -1843,6 +2023,7 @@ static int qemudRunLoop(struct qemud_ser
         pthread_join(thread, NULL);
         virMutexLock(&server->lock);
     }
+    VIR_FREE(server->workers);
 
     free(server->workers);
     virMutexUnlock(&server->lock);
@@ -2223,6 +2404,9 @@ remoteReadConfigFile (struct qemud_serve
     GET_CONF_INT (conf, filename, max_workers);
     GET_CONF_INT (conf, filename, max_clients);
 
+    GET_CONF_INT (conf, filename, max_requests);
+    GET_CONF_INT (conf, filename, max_client_requests);
+
     virConfFree (conf);
     return 0;
 
diff --git a/qemud/qemud.h b/qemud/qemud.h
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -65,15 +65,6 @@
 
 #define qemudDebug DEBUG
 
-enum qemud_mode {
-    QEMUD_MODE_RX_HEADER,       /* Receiving the fixed length RPC header data */
-    QEMUD_MODE_RX_PAYLOAD,      /* Receiving the variable length RPC payload data */
-    QEMUD_MODE_WAIT_DISPATCH,   /* Message received, waiting for worker to process */
-    QEMUD_MODE_IN_DISPATCH,     /* RPC call being processed */
-    QEMUD_MODE_TX_PACKET,       /* Transmitting reply to RPC call */
-    QEMUD_MODE_TLS_HANDSHAKE,   /* Performing TLS handshake */
-};
-
 /* Whether we're passing reads & writes through a sasl SSF */
 enum qemud_sasl_ssf {
     QEMUD_SASL_SSF_NONE = 0,
@@ -87,6 +78,16 @@ enum qemud_sock_type {
     QEMUD_SOCK_TYPE_TLS = 2,
 };
 
+struct qemud_client_message {
+    char buffer [REMOTE_MESSAGE_MAX + REMOTE_MESSAGE_HEADER_XDR_LEN];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
+    int async : 1;
+
+    struct qemud_client_message *next;
+};
+
 /* Stores the per-client connection state */
 struct qemud_client {
     virMutex lock;
@@ -97,7 +98,6 @@ struct qemud_client {
     int watch;
     int readonly:1;
     int closing:1;
-    enum qemud_mode mode;
 
     struct sockaddr_storage addr;
     socklen_t addrlen;
@@ -105,6 +105,7 @@ struct qemud_client {
     int type; /* qemud_sock_type */
     gnutls_session_t tlssession;
     int auth;
+    int handshake : 1; /* If we're in progress for TLS handshake */
 #if HAVE_SASL
     sasl_conn_t *saslconn;
     int saslSSF;
@@ -117,12 +118,20 @@ struct qemud_client {
     char *saslUsername;
 #endif
 
-    unsigned int incomingSerial;
-    unsigned int outgoingSerial;
-
-    char buffer [REMOTE_MESSAGE_MAX];
-    unsigned int bufferLength;
-    unsigned int bufferOffset;
+    /* Count of meages in 'dx' or 'tx' queue
+     * ie RPC calls in progress. Does not count
+     * async events which are not used for
+     * throttling calculations */
+    int nrequests;
+    /* Zero or one messages being received. Zero if
+     * nrequests >= max_clients and throttling */
+    struct qemud_client_message *rx;
+    /* Zero or many messages waiting for a worker
+     * to process them */
+    struct qemud_client_message *dx;
+    /* Zero or many messages waiting for transmit
+     * back to client, including async events */
+    struct qemud_client_message *tx;
 
     /* This is only valid if a remote open call has been made on this
      * connection, otherwise it will be NULL.  Also if remote close is
@@ -181,16 +190,20 @@ void qemudLog(int priority, const char *
 int qemudSetCloseExec(int fd);
 int qemudSetNonBlock(int fd);
 
-unsigned int
+int
 remoteDispatchClientRequest (struct qemud_server *server,
-                             struct qemud_client *client);
+                             struct qemud_client *client,
+                             struct qemud_client_message *req);
 
-void qemudDispatchClientWrite(struct qemud_server *server,
-                             struct qemud_client *client);
+int qemudRegisterClientEvent(struct qemud_server *server,
+                             struct qemud_client *client,
+                             int update);
 
-#if HAVE_POLKIT
-int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
-#endif
+void qemudDispatchClientFailure(struct qemud_client *client);
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+                            struct qemud_client_message *msg);
 
 int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
                             virDomainPtr dom,
@@ -198,4 +211,9 @@ int remoteRelayDomainEvent (virConnectPt
                             int detail,
                             void *opaque);
 
+
+#if HAVE_POLKIT
+int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
 #endif
+
+#endif
diff --git a/qemud/remote.c b/qemud/remote.c
--- a/qemud/remote.c
+++ b/qemud/remote.c
@@ -111,6 +111,7 @@ static const dispatch_data const dispatc
 /* Prototypes */
 static void
 remoteDispatchDomainEventSend (struct qemud_client *client,
+                               struct qemud_client_message *msg,
                                virDomainPtr dom,
                                int event,
                                int detail);
@@ -219,9 +220,10 @@ remoteDispatchConnError (remote_error *r
  * Server object is unlocked
  * Client object is locked
  */
-unsigned int
+int
 remoteDispatchClientRequest (struct qemud_server *server,
-                             struct qemud_client *client)
+                             struct qemud_client *client,
+                             struct qemud_client_message *msg)
 {
     XDR xdr;
     remote_message_header req, rep;
@@ -229,7 +231,8 @@ remoteDispatchClientRequest (struct qemu
     dispatch_args args;
     dispatch_ret ret;
     const dispatch_data *data = NULL;
-    int rv = -1, len;
+    int rv = -1;
+    unsigned int len;
     virConnectPtr conn = NULL;
 
     memset(&args, 0, sizeof args);
@@ -237,7 +240,10 @@ remoteDispatchClientRequest (struct qemu
     memset(&rerr, 0, sizeof rerr);
 
     /* Parse the header. */
-    xdrmem_create (&xdr, client->buffer, client->bufferLength, XDR_DECODE);
+    xdrmem_create (&xdr,
+                   msg->buffer + REMOTE_MESSAGE_HEADER_XDR_LEN,
+                   msg->bufferLength - REMOTE_MESSAGE_HEADER_XDR_LEN,
+                   XDR_DECODE);
 
     if (!xdr_remote_message_header (&xdr, &req))
         goto fatal_error;
@@ -333,10 +339,10 @@ rpc_error:
     rep.status = rv < 0 ? REMOTE_ERROR : REMOTE_OK;
 
     /* Serialise the return header. */
-    xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+    xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
 
     len = 0; /* We'll come back and write this later. */
-    if (!xdr_int (&xdr, &len)) {
+    if (!xdr_u_int (&xdr, &len)) {
         if (rv == 0) xdr_free (data->ret_filter, (char*)&ret);
         goto fatal_error;
     }
@@ -364,17 +370,21 @@ rpc_error:
     if (xdr_setpos (&xdr, 0) == 0)
         goto fatal_error;
 
-    if (!xdr_int (&xdr, &len))
+    if (!xdr_u_int (&xdr, &len))
         goto fatal_error;
 
     xdr_destroy (&xdr);
-    return len;
+
+    msg->bufferLength = len;
+    msg->bufferOffset = 0;
+
+    return 0;
 
 fatal_error:
     /* Seriously bad stuff happened, so we'll kill off this client
        and not send back any RPC error */
     xdr_destroy (&xdr);
-    return 0;
+    return -1;
 }
 
 int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
@@ -386,9 +396,20 @@ int remoteRelayDomainEvent (virConnectPt
     struct qemud_client *client = opaque;
     REMOTE_DEBUG("Relaying domain event %d %d", event, detail);
 
-    if(client) {
-        remoteDispatchDomainEventSend (client, dom, event, detail);
-        qemudDispatchClientWrite(client->server,client);
+    if (client) {
+        struct qemud_client_message *ev;
+
+        if (VIR_ALLOC(ev) < 0)
+            return -1;
+
+        virMutexLock(&client->lock);
+
+        remoteDispatchDomainEventSend (client, ev, dom, event, detail);
+
+        if (qemudRegisterClientEvent(client->server, client, 1) < 0)
+            qemudDispatchClientFailure(client);
+
+        virMutexUnlock(&client->lock);
     }
     return 0;
 }
@@ -4202,13 +4223,14 @@ remoteDispatchDomainEventsDeregister (st
 
 static void
 remoteDispatchDomainEventSend (struct qemud_client *client,
+                               struct qemud_client_message *msg,
                                virDomainPtr dom,
                                int event,
                                int detail)
 {
     remote_message_header rep;
     XDR xdr;
-    int len;
+    unsigned int len;
     remote_domain_event_ret data;
 
     if (!client)
@@ -4222,11 +4244,11 @@ remoteDispatchDomainEventSend (struct qe
     rep.status = REMOTE_OK;
 
     /* Serialise the return header and event. */
-    xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+    xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
 
     len = 0; /* We'll come back and write this later. */
-    if (!xdr_int (&xdr, &len)) {
-        /*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (1)"));*/
+    if (!xdr_u_int (&xdr, &len)) {
+        /*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (1)"));*/
         xdr_destroy (&xdr);
         return;
     }
@@ -4254,8 +4276,8 @@ remoteDispatchDomainEventSend (struct qe
         return;
     }
 
-    if (!xdr_int (&xdr, &len)) {
-        /*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (2)"));*/
+    if (!xdr_u_int (&xdr, &len)) {
+        /*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (2)"));*/
         xdr_destroy (&xdr);
         return;
     }
@@ -4263,9 +4285,10 @@ remoteDispatchDomainEventSend (struct qe
     xdr_destroy (&xdr);
 
     /* Send it. */
-    client->mode = QEMUD_MODE_TX_PACKET;
-    client->bufferLength = len;
-    client->bufferOffset = 0;
+    msg->async = 1;
+    msg->bufferLength = len;
+    msg->bufferOffset = 0;
+    qemudClientMessageQueuePush(&client->tx, msg);
 }
 
 /*----- Helpers. -----*/
diff --git a/qemud/test_libvirtd.aug b/qemud/test_libvirtd.aug
--- a/qemud/test_libvirtd.aug
+++ b/qemud/test_libvirtd.aug
@@ -246,6 +246,19 @@ max_clients = 20
 # of clients allowed
 min_workers = 5
 max_workers = 20
+
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+max_client_requests = 5
 "
 
    test Libvirtd.lns get conf =
@@ -499,3 +512,16 @@ max_workers = 20
         { "#comment" = "of clients allowed"}
         { "min_workers" = "5" }
         { "max_workers" = "20" }
+	{ "#empty" }
+        { "#comment" = "Total global limit on concurrent RPC calls. Should be" }
+        { "#comment" = "at least as large as max_workers. Beyond this, RPC requests" }
+        { "#comment" = "will be read into memory and queued. This directly impact" }
+        { "#comment" = "memory usage, currently each request requires 256 KB of" }
+        { "#comment" = "memory. So by default upto 5 MB of memory is used" }
+        { "max_requests" = "20" }
+	{ "#empty" }
+        { "#comment" = "Limit on concurrent requests from a single client" }
+        { "#comment" = "connection. To avoid one client monopolizing the server" }
+        { "#comment" = "this should be a small fraction of the global max_requests" }
+        { "#comment" = "and max_workers parameter" }
+        { "max_client_requests" = "5" }
diff --git a/src/remote_internal.c b/src/remote_internal.c
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -5663,13 +5663,13 @@ prepareCall(virConnectPtr conn,
     /* Length must include the length word itself (always encoded in
      * 4 bytes as per RFC 4506).
      */
-    rv->bufferLength += 4;
+    rv->bufferLength += REMOTE_MESSAGE_HEADER_XDR_LEN;
 
     /* Encode the length word. */
-    xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
-    if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
+    xdrmem_create (&xdr, rv->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE);
+    if (!xdr_u_int (&xdr, &rv->bufferLength)) {
         error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("xdr_int (length word)"));
+               _("xdr_u_int (length word)"));
         goto error;
     }
     xdr_destroy (&xdr);
@@ -5965,20 +5965,26 @@ static int
 processCallRecvLen(virConnectPtr conn, struct private_data *priv,
                    int in_open) {
     XDR xdr;
-    int len;
+    unsigned int len;
 
     xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
+    if (!xdr_u_int (&xdr, &len)) {
         error (in_open ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+               VIR_ERR_RPC, _("xdr_u_int (length word, reply)"));
         return -1;
     }
     xdr_destroy (&xdr);
 
+    if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("packet received from server too small"));
+        return -1;
+    }
+
     /* Length includes length word - adjust to real length to read. */
-    len -= 4;
-
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+    len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+    if (len > REMOTE_MESSAGE_MAX) {
         error (in_open ? NULL : conn,
                VIR_ERR_RPC, _("packet received from server too large"));
         return -1;

-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]