[libvirt] [PATCH 05/11] Handle outgoing data streams in libvirtd

Daniel P. Berrange berrange at redhat.com
Mon Aug 24 20:51:08 UTC 2009


* qemud/dispatch.c: Set streamTX flag on outgoing data packets
* qemud/qemud.h: Add streamTX flag to track outgoing data
* qemud/qemud.c: Re-enable further TX when outgoing data packet
  has been fully sent.
* qemud/stream.h, qemud/strea.c: Add method for enabling TX.
  Support reading from streams and transmitting data out to client
---
 qemud/dispatch.c |    2 +
 qemud/qemud.c    |    4 ++-
 qemud/qemud.h    |    1 +
 qemud/stream.c   |   96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 qemud/stream.h   |    4 ++
 5 files changed, 106 insertions(+), 1 deletions(-)

diff --git a/qemud/dispatch.c b/qemud/dispatch.c
index 1934d24..7417001 100644
--- a/qemud/dispatch.c
+++ b/qemud/dispatch.c
@@ -636,6 +636,8 @@ remoteSendStreamData(struct qemud_client *client,
 
         DEBUG("Total %d", msg->bufferOffset);
     }
+    if (data)
+        msg->streamTX = 1;
 
     /* Reset ready for I/O */
     msg->bufferLength = msg->bufferOffset;
diff --git a/qemud/qemud.c b/qemud/qemud.c
index 6c81dec..af71495 100644
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -1893,7 +1893,9 @@ void
 qemudClientMessageRelease(struct qemud_client *client,
                           struct qemud_client_message *msg)
 {
-    if (!msg->async)
+    if (msg->streamTX) {
+        remoteStreamMessageFinished(client, msg);
+    } else if (!msg->async)
         client->nrequests--;
 
     /* See if the recv queue is currently throttled */
diff --git a/qemud/qemud.h b/qemud/qemud.h
index 8ef5871..911cdc3 100644
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -130,6 +130,7 @@ struct qemud_client_message {
     unsigned int bufferOffset;
 
     unsigned int async : 1;
+    unsigned int streamTX : 1;
 
     remote_message_header hdr;
 
diff --git a/qemud/stream.c b/qemud/stream.c
index 1fe0e58..584268d 100644
--- a/qemud/stream.c
+++ b/qemud/stream.c
@@ -32,6 +32,9 @@ static int
 remoteStreamHandleWrite(struct qemud_client *client,
                         struct qemud_client_stream *stream);
 static int
+remoteStreamHandleRead(struct qemud_client *client,
+                       struct qemud_client_stream *stream);
+static int
 remoteStreamHandleFinish(struct qemud_client *client,
                          struct qemud_client_stream *stream,
                          struct qemud_client_message *msg);
@@ -48,6 +51,8 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream)
     int newEvents = 0;
     if (stream->rx)
         newEvents |= VIR_STREAM_EVENT_WRITABLE;
+    if (stream->tx && !stream->recvEOF)
+        newEvents |= VIR_STREAM_EVENT_READABLE;
 
     virStreamEventUpdateCallback(stream->st, newEvents);
 }
@@ -87,6 +92,16 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque)
         }
     }
 
+    if (!stream->recvEOF &&
+        (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) {
+        events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP);
+        if (remoteStreamHandleRead(client, stream) < 0) {
+            remoteRemoveClientStream(client, stream);
+            qemudDispatchClientFailure(client);
+            goto cleanup;
+        }
+    }
+
     if (!stream->closed &&
         (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
         int ret;
@@ -507,3 +522,84 @@ remoteStreamHandleWrite(struct qemud_client *client,
 
     return 0;
 }
+
+
+
+/*
+ * Invoked when a stream is signalled as having data
+ * available to read. This reads upto one message
+ * worth of data, and then queues that for transmission
+ * to the client.
+ *
+ * Returns 0 if data was queued for TX, or a error RPC
+ * was sent, or -1 on fatal error, indicating client should
+ * be killed
+ */
+static int
+remoteStreamHandleRead(struct qemud_client *client,
+                       struct qemud_client_stream *stream)
+{
+    char *buffer;
+    size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX;
+    int ret;
+
+    DEBUG("stream=%p", stream);
+
+    /* Shouldn't ever be called unless we're marked able to
+     * transmit, but doesn't hurt to check */
+    if (!stream->tx)
+        return 0;
+
+    if (VIR_ALLOC_N(buffer, bufferLen) < 0)
+        return -1;
+
+    ret = virStreamRecv(stream->st, buffer, bufferLen);
+    if (ret == -2) {
+        /* Should never get this, since we're only called when we know
+         * we're readable, but hey things change... */
+        ret = 0;
+    } else if (ret < 0) {
+        remote_error rerr;
+        memset(&rerr, 0, sizeof rerr);
+        remoteDispatchConnError(&rerr, NULL);
+
+        ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial);
+    } else {
+        stream->tx = 0;
+        if (ret == 0)
+            stream->recvEOF = 1;
+        ret = remoteSendStreamData(client, stream, buffer, ret);
+    }
+
+    VIR_FREE(buffer);
+    return ret;
+}
+
+
+/*
+ * Invoked when an outgoing data packet message has been fully sent.
+ * This simply re-enables TX of further data.
+ *
+ * The idea is to stop the daemon growing without bound due to
+ * fast stream, but slow client
+ */
+void
+remoteStreamMessageFinished(struct qemud_client *client,
+                            struct qemud_client_message *msg)
+{
+    struct qemud_client_stream *stream = client->streams;
+
+    while (stream) {
+        if (msg->hdr.proc == stream->procedure &&
+            msg->hdr.serial == stream->serial)
+            break;
+        stream = stream->next;
+    }
+
+    DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial);
+
+    if (stream) {
+        stream->tx = 1;
+        remoteStreamUpdateEvents(stream);
+    }
+}
diff --git a/qemud/stream.h b/qemud/stream.h
index fe5ce6f..de738ba 100644
--- a/qemud/stream.h
+++ b/qemud/stream.h
@@ -46,4 +46,8 @@ int
 remoteRemoveClientStream(struct qemud_client *client,
                          struct qemud_client_stream *stream);
 
+void
+remoteStreamMessageFinished(struct qemud_client *client,
+                            struct qemud_client_message *msg);
+
 #endif /* __LIBVIRTD_STREAM_H__ */
-- 
1.6.2.5




More information about the libvir-list mailing list