[libvirt] [PATCH 14/21] qemu: migration: src: stream piping

Pavel Boldin pboldin at mirantis.com
Wed Nov 18 18:13:12 UTC 2015


Add and use qemuMigrationPipeEvent piped streams' event handler. It
sets the appropriate event flags for each of the stream and pumps the
pipe using qemuMigrationPipeIO whenever there is a data at any end.

Signed-off-by: Pavel Boldin <pboldin at mirantis.com>
---
 src/qemu/qemu_migration.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 84 insertions(+), 1 deletion(-)

diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
index 0f35c13..43f71e9 100644
--- a/src/qemu/qemu_migration.c
+++ b/src/qemu/qemu_migration.c
@@ -4010,8 +4010,28 @@ struct _qemuMigrationPipe {
     qemuMigrationIOThreadPtr data;
     virStreamPtr local;
     virStreamPtr remote;
+
+    int local_flags : 4;
+    int remote_flags : 4;
+    char buffer[TUNNEL_SEND_BUF_SIZE];
 };
 
+static int
+qemuMigrationPipeIO(virStreamPtr from, virStreamPtr to, char *buffer)
+{
+    int done, got, offset = 0;
+    got = virStreamRecv(from, buffer, TUNNEL_SEND_BUF_SIZE);
+
+    while (offset < got) {
+        done = virStreamSend(to, buffer + offset, got - offset);
+        if (done < 0)
+            break;
+        offset += done;
+    }
+
+    return got;
+}
+
 static void
 qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort)
 {
@@ -4030,6 +4050,55 @@ qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort)
     virObjectUnref(pipe->remote);
 }
 
+static void
+qemuMigrationPipeEvent(virStreamPtr stream, int events, void *opaque)
+{
+    qemuMigrationPipePtr pipe = opaque;
+
+    if (stream == pipe->remote)
+        pipe->remote_flags |= events;
+    if (stream == pipe->local)
+        pipe->local_flags |= events;
+
+    VIR_DEBUG("remote = %p, remote_flags = %x, local = %p, local_flags = %x",
+              pipe->remote, pipe->remote_flags,
+              pipe->local, pipe->local_flags);
+
+    if (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP)) {
+        char dummy;
+        virStreamRecv(stream, &dummy, 1);
+ abrt:
+        virCopyLastError(&pipe->data->err);
+        qemuMigrationPipeClose(pipe, true);
+        if (safewrite(pipe->data->wakeupSendFD, "c", 1) != 1) {
+            virReportSystemError(errno, "%s",
+                                 _("failed to stop migration tunnel"));
+        }
+        return;
+    }
+
+    if ((pipe->remote_flags & VIR_STREAM_EVENT_READABLE) &&
+        (pipe->local_flags & VIR_STREAM_EVENT_WRITABLE)) {
+
+        if (qemuMigrationPipeIO(pipe->remote, pipe->local, pipe->buffer) == -1)
+            goto abrt;
+
+        pipe->remote_flags &= ~VIR_STREAM_EVENT_READABLE;
+        pipe->local_flags &= ~VIR_STREAM_EVENT_WRITABLE;
+    }
+
+    if ((pipe->local_flags & VIR_STREAM_EVENT_READABLE) &&
+        (pipe->remote_flags & VIR_STREAM_EVENT_WRITABLE)) {
+
+        if (qemuMigrationPipeIO(pipe->local, pipe->remote, pipe->buffer) == -1)
+            goto abrt;
+
+        pipe->local_flags &= ~VIR_STREAM_EVENT_READABLE;
+        pipe->remote_flags &= ~VIR_STREAM_EVENT_WRITABLE;
+    }
+}
+
+
 static qemuMigrationPipePtr
 qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote)
 {
@@ -4041,6 +4110,20 @@ qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote)
     pipe->local = local;
     pipe->remote = remote;
 
+    if (virStreamEventAddCallback(local,
+                                  VIR_STREAM_EVENT_READABLE |
+                                  VIR_STREAM_EVENT_WRITABLE,
+                                  qemuMigrationPipeEvent,
+                                  pipe, NULL) < 0)
+        goto error;
+
+    if (virStreamEventAddCallback(remote,
+                                  VIR_STREAM_EVENT_READABLE |
+                                  VIR_STREAM_EVENT_WRITABLE,
+                                  qemuMigrationPipeEvent,
+                                  pipe, NULL) < 0)
+        goto error;
+
     return pipe;
 
  error:
@@ -4230,7 +4313,7 @@ qemuMigrationIOFunc(void *arg)
     /* Let the source qemu know that the transfer cant continue anymore.
      * Don't copy the error for EPIPE as destination has the actual error. */
     VIR_FORCE_CLOSE(data->qemuSock);
-    if (!virLastErrorIsSystemErrno(EPIPE))
+    if (data->err.code == VIR_ERR_OK && !virLastErrorIsSystemErrno(EPIPE))
         virCopyLastError(&data->err);
     virResetLastError();
     VIR_FREE(buffer);
-- 
1.9.1




More information about the libvir-list mailing list