[libvirt] [PATCH 15/16] Run tunnelled migration IO in separate thread

Daniel P. Berrange berrange at redhat.com
Wed May 11 09:10:01 UTC 2011


By running the doTunnelSendAll code in a separate thread, the
main thread can do qemuMigrationWaitForCompletion as with
normal migration. This in turn ensures that job signals work
correctly and that progress monitoring can be done

* src/qemu/qemu_migration.c: Runn tunnelled migration in
  separate thread
---
 src/qemu/qemu_migration.c |   95 ++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 81 insertions(+), 14 deletions(-)

diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
index b8e595e..5413186 100644
--- a/src/qemu/qemu_migration.c
+++ b/src/qemu/qemu_migration.c
@@ -1289,44 +1289,101 @@ cleanup:
 
 #define TUNNEL_SEND_BUF_SIZE 65536
 
-static int doTunnelSendAll(virStreamPtr st,
-                           int sock)
+typedef struct _qemuMigrationIOThread  qemuMigrationIOThread;
+typedef qemuMigrationIOThread * qemuMigrationIOThreadPtr;
+struct _qemuMigrationIOThread {
+    virThread thread;
+    virStreamPtr st;
+    int sock;
+    virError err;
+};
+
+static void qemuMigrationIOFunc(void *arg)
 {
+    qemuMigrationIOThreadPtr data = arg;
     char *buffer;
     int nbytes = TUNNEL_SEND_BUF_SIZE;
 
     if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) {
         virReportOOMError();
-        virStreamAbort(st);
-        return -1;
+        virStreamAbort(data->st);
+        goto error;
     }
 
     for (;;) {
-        nbytes = saferead(sock, buffer, TUNNEL_SEND_BUF_SIZE);
+        nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
         if (nbytes < 0) {
             virReportSystemError(errno, "%s",
                                  _("tunnelled migration failed to read from qemu"));
-            virStreamAbort(st);
+            virStreamAbort(data->st);
             VIR_FREE(buffer);
-            return -1;
+            goto error;
         }
         else if (nbytes == 0)
             /* EOF; get out of here */
             break;
 
-        if (virStreamSend(st, buffer, nbytes) < 0) {
+        if (virStreamSend(data->st, buffer, nbytes) < 0) {
             VIR_FREE(buffer);
-            return -1;
+            goto error;
         }
     }
 
     VIR_FREE(buffer);
 
-    if (virStreamFinish(st) < 0)
-        /* virStreamFinish set the error for us */
-        return -1;
+    if (virStreamFinish(data->st) < 0)
+        goto error;
 
-    return 0;
+    return;
+
+error:
+    virCopyLastError(&data->err);
+    virResetLastError();
+}
+
+
+static qemuMigrationIOThreadPtr
+qemuMigrationStartTunnel(virStreamPtr st,
+                         int sock)
+{
+    qemuMigrationIOThreadPtr io;
+
+    if (VIR_ALLOC(io) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    io->st = st;
+    io->sock = sock;
+
+    if (virThreadCreate(&io->thread, true,
+                        qemuMigrationIOFunc,
+                        io) < 0) {
+        VIR_FREE(io);
+        return NULL;
+    }
+
+    return io;
+}
+
+static int
+qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io)
+{
+    int rv = -1;
+    virThreadJoin(&io->thread);
+
+    /* Forward error from the IO thread, to this thread */
+    if (io->err.code != VIR_ERR_OK) {
+        virSetError(&io->err);
+        virResetError(&io->err);
+        goto cleanup;
+    }
+
+    rv = 0;
+
+cleanup:
+    VIR_FREE(io);
+    return rv;
 }
 
 
@@ -1351,6 +1408,7 @@ static int doTunnelMigrate(struct qemud_driver *driver,
     unsigned int background_flags = QEMU_MONITOR_MIGRATE_BACKGROUND;
     int ret = -1;
     qemuMigrationCookiePtr mig = NULL;
+    qemuMigrationIOThreadPtr iothread = NULL;
 
     if (!qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_UNIX) &&
         !qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_EXEC)) {
@@ -1486,7 +1544,16 @@ static int doTunnelMigrate(struct qemud_driver *driver,
         goto cancel;
     }
 
-    ret = doTunnelSendAll(st, client_sock);
+    if (!(iothread = qemuMigrationStartTunnel(st, client_sock)))
+        goto cancel;
+
+    ret = qemuMigrationWaitForCompletion(driver, vm);
+
+    /* Close now to ensure the IO thread quits & is joinable in next method */
+    VIR_FORCE_CLOSE(client_sock);
+
+    if (qemuMigrationStopTunnel(iothread) < 0)
+        ret = -1;
 
     if (ret == 0 &&
         qemuMigrationBakeCookie(mig, driver, vm, cookieout, cookieoutlen, 0) < 0)
-- 
1.7.4.4




More information about the libvir-list mailing list