[libvirt] [PATCH 13/21] qemu: migration: src: qemuNBDTunnelAcceptAndPipe

Pavel Boldin pboldin at mirantis.com
Wed Nov 18 18:13:11 UTC 2015


Add qemuNBDTunnelAcceptAndPipe function that is called to handle POLLIN
on the UNIX socket connection from the QEMU's NBD server.

The function creates a pipe of a remote stream connected to the QEMU
NBD Unix socket on destination and a local stream connected to
the incoming connection from the source QEMU's NBD.

Signed-off-by: Pavel Boldin <pboldin at mirantis.com>
---
 src/qemu/qemu_migration.c | 134 +++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 132 insertions(+), 2 deletions(-)

diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
index 0682fd8..0f35c13 100644
--- a/src/qemu/qemu_migration.c
+++ b/src/qemu/qemu_migration.c
@@ -3987,6 +3987,9 @@ struct _qemuMigrationSpec {
 
 #define TUNNEL_SEND_BUF_SIZE 65536
 
+typedef struct _qemuMigrationPipe qemuMigrationPipe;
+typedef qemuMigrationPipe *qemuMigrationPipePtr;
+
 typedef struct _qemuMigrationIOThread qemuMigrationIOThread;
 typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr;
 struct _qemuMigrationIOThread {
@@ -3997,9 +4000,124 @@ struct _qemuMigrationIOThread {
     virError err;
     int wakeupRecvFD;
     int wakeupSendFD;
+    qemuMigrationPipePtr pipes;
+    virConnectPtr dconn;
+    unsigned char uuid[VIR_UUID_BUFLEN];
+};
+
+struct _qemuMigrationPipe {
+    qemuMigrationPipePtr next;
+    qemuMigrationIOThreadPtr data;
+    virStreamPtr local;
+    virStreamPtr remote;
 };
 
 static void
+qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort)
+{
+    virStreamEventUpdateCallback(pipe->local, 0);
+    virStreamEventUpdateCallback(pipe->remote, 0);
+
+    if (abort) {
+        virStreamAbort(pipe->local);
+        virStreamAbort(pipe->remote);
+    } else {
+        virStreamFinish(pipe->local);
+        virStreamFinish(pipe->remote);
+    }
+
+    virObjectUnref(pipe->local);
+    virObjectUnref(pipe->remote);
+}
+
+static qemuMigrationPipePtr
+qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote)
+{
+    qemuMigrationPipePtr pipe = NULL;
+
+    if (VIR_ALLOC(pipe) < 0)
+        goto error;
+
+    pipe->local = local;
+    pipe->remote = remote;
+
+    return pipe;
+
+ error:
+    virStreamEventRemoveCallback(local);
+    virStreamEventRemoveCallback(remote);
+    VIR_FREE(pipe);
+    return NULL;
+}
+
+
+static int
+qemuNBDTunnelAcceptAndPipe(qemuMigrationIOThreadPtr data)
+{
+    int fd, ret;
+    virStreamPtr local = NULL, remote = NULL;
+    qemuMigrationPipePtr pipe = NULL;
+
+    while ((fd = accept(data->unixSock, NULL, NULL)) < 0) {
+        if (errno == EAGAIN || errno == EINTR)
+            continue;
+        virReportSystemError(
+            errno, "%s", _("failed to accept connection from qemu"));
+        goto abrt;
+    }
+
+    if (!(local = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK)))
+        goto abrt;
+
+    if (!(remote = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK)))
+        goto abrt;
+
+    ret = virDomainMigrateOpenTunnel(data->dconn,
+                                     remote,
+                                     data->uuid,
+                                     VIR_MIGRATE_TUNNEL_NBD);
+
+    if (ret < 0)
+        goto abrt;
+
+    if (virFDStreamOpen(local, fd) < 0)
+        goto abrt;
+
+    if (!(pipe = qemuMigrationPipeCreate(local, remote)))
+        goto abrt;
+
+    pipe->data = data;
+    pipe->next = data->pipes;
+    data->pipes = pipe;
+
+    return 0;
+
+ abrt:
+    VIR_FORCE_CLOSE(fd);
+    virStreamAbort(local);
+    virStreamAbort(remote);
+
+    virObjectUnref(local);
+    virObjectUnref(remote);
+    return -1;
+}
+
+static void
+qemuMigrationPipesStop(qemuMigrationPipePtr pipe, bool abort)
+{
+    qemuMigrationPipePtr tmp;
+
+    while (pipe) {
+        tmp = pipe->next;
+
+        qemuMigrationPipeClose(pipe, abort);
+        VIR_FREE(pipe);
+
+        pipe = tmp;
+    }
+}
+
+static void
 qemuMigrationIOFunc(void *arg)
 {
     qemuMigrationIOThreadPtr data = arg;
@@ -4081,9 +4199,14 @@ qemuMigrationIOFunc(void *arg)
                 break;
             }
         }
+
+        if (fds[2].revents & (POLLIN | POLLERR | POLLHUP) &&
+            qemuNBDTunnelAcceptAndPipe(data) < 0)
+            goto abrt;
     }
 
     virStreamFinish(data->qemuStream);
+    qemuMigrationPipesStop(data->pipes, false);
 
     VIR_FORCE_CLOSE(data->qemuSock);
     VIR_FREE(buffer);
@@ -4097,6 +4220,7 @@ qemuMigrationIOFunc(void *arg)
         err = NULL;
     }
     virStreamAbort(data->qemuStream);
+    qemuMigrationPipesStop(data->pipes, true);
     if (err) {
         virSetError(err);
         virFreeError(err);
@@ -4114,7 +4238,9 @@ qemuMigrationIOFunc(void *arg)
 
 
 static qemuMigrationIOThreadPtr
-qemuMigrationStartTunnel(virStreamPtr qemuStream)
+qemuMigrationStartTunnel(virStreamPtr qemuStream,
+                         virConnectPtr dconn,
+                         unsigned char uuid[VIR_UUID_BUFLEN])
 {
     qemuMigrationIOThreadPtr io = NULL;
     int wakeupFD[2] = { -1, -1 };
@@ -4132,6 +4258,8 @@ qemuMigrationStartTunnel(virStreamPtr qemuStream)
     io->qemuSock = io->unixSock = -1;
     io->wakeupRecvFD = wakeupFD[0];
     io->wakeupSendFD = wakeupFD[1];
+    io->dconn = dconn;
+    memcpy(io->uuid, uuid, VIR_UUID_BUFLEN);
 
     if (virThreadCreate(&io->thread, true,
                         qemuMigrationIOFunc,
@@ -4337,7 +4465,9 @@ qemuMigrationRun(virQEMUDriverPtr driver,
         VIR_WARN("unable to provide data for graphics client relocation");
 
     if (spec->fwdType != MIGRATION_FWD_DIRECT) {
-        if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream)))
+        if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream,
+                                                  dconn,
+                                                  mig->uuid)))
             goto cancel;
 
         if (nmigrate_disks &&
-- 
1.9.1




More information about the libvir-list mailing list