[libvirt] [PATCH v2 30/38] virNetClientStream: Wire up VIR_NET_STREAM_SKIP

Michal Privoznik mprivozn at redhat.com
Thu Apr 20 10:01:59 UTC 2017


Whenever server sends a client stream packet (either regular with
actual data or stream skip one) it is queued on @st->rx. So the
list is a mixture of both types of stream packets. So now that we
have all the helpers needed we can wire their processing up. But
since virNetClientStreamRecvPacket doesn't support
VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received
skips into zeroes repeating requested times.

Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
---
 src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 42 insertions(+), 2 deletions(-)

diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index c773524..ff35137 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -296,6 +296,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
 
     virObjectLock(st);
 
+    /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
+     * here just yet. We want in order processing! */
     virNetMessageQueuePush(&st->rx, tmp_msg);
 
     virNetClientStreamEventTimerUpdate(st);
@@ -359,7 +361,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
 }
 
 
-static int ATTRIBUTE_UNUSED
+static int
 virNetClientStreamHandleSkip(virNetClientPtr client,
                              virNetClientStreamPtr st)
 {
@@ -435,6 +437,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
     virCheckFlags(0, -1);
 
     virObjectLock(st);
+
+ reread:
     if (!st->rx && !st->incomingEOF) {
         virNetMessagePtr msg;
         int ret;
@@ -466,8 +470,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
     }
 
     VIR_DEBUG("After IO rx=%p", st->rx);
+
+    while (st->rx &&
+           st->rx->header.type == VIR_NET_STREAM_SKIP) {
+        /* Handle skip sent to us by server. */
+
+        if (virNetClientStreamHandleSkip(client, st) < 0)
+            goto cleanup;
+    }
+
+    if (!st->rx && !st->incomingEOF && !st->skipLength) {
+        if (nonblock) {
+            VIR_DEBUG("Non-blocking mode and no data available");
+            rv = -2;
+            goto cleanup;
+        }
+
+        /* We have consumed all packets from incoming queue but those
+         * were only skip packets, no data. Read the stream again. */
+        goto reread;
+    }
+
     want = nbytes;
-    while (want && st->rx) {
+
+    if (st->skipLength) {
+        /* Pretend skipLength zeroes was read from stream. */
+        size_t len = want;
+
+        if (len > st->skipLength)
+            len = st->skipLength;
+
+        memset(data, 0, len);
+        st->skipLength -= len;
+        want -= len;
+    }
+
+    while (want &&
+           st->rx &&
+           st->rx->header.type == VIR_NET_STREAM) {
         virNetMessagePtr msg = st->rx;
         size_t len = want;
 
-- 
2.10.2




More information about the libvir-list mailing list