[libvirt] [PATCH 1/6] Enhance the streams helper to support plain file I/O

Daniel P. Berrange berrange at redhat.com
Tue Mar 22 11:29:30 UTC 2011


The O_NONBLOCK flag doesn't work as desired on plain files
or block devices. Introduce an I/O helper program that does
the blocking I/O operations, communicating over a pipe that
can support O_NONBLOCK

* src/fdstream.c, src/fdstream.h: Add non-blocking I/O
  on plain files/block devices
* src/Makefile.am, src/util/iohelper.c: I/O helper program
* src/qemu/qemu_driver.c, src/lxc/lxc_driver.c,
  src/uml/uml_driver.c, src/xen/xen_driver.c: Update for
  streams API change
---
 po/POTFILES.in         |    1 +
 src/Makefile.am        |   12 +++
 src/fdstream.c         |  222 ++++++++++++++++++++++++++++++++++++------------
 src/fdstream.h         |    5 +
 src/lxc/lxc_driver.c   |    2 +-
 src/qemu/qemu_driver.c |    2 +-
 src/uml/uml_driver.c   |    2 +-
 src/util/iohelper.c    |  208 +++++++++++++++++++++++++++++++++++++++++++++
 src/xen/xen_driver.c   |    2 +-
 9 files changed, 396 insertions(+), 60 deletions(-)
 create mode 100644 src/util/iohelper.c

diff --git a/po/POTFILES.in b/po/POTFILES.in
index 805e5ca..12adb3e 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -94,6 +94,7 @@ src/util/event_poll.c
 src/util/hash.c
 src/util/hooks.c
 src/util/hostusb.c
+src/util/iohelper.c
 src/util/interface.c
 src/util/iptables.c
 src/util/json.c
diff --git a/src/Makefile.am b/src/Makefile.am
index c3729a6..1d8115b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -380,6 +380,9 @@ STORAGE_DRIVER_DISK_SOURCES =					\
 STORAGE_HELPER_DISK_SOURCES =					\
 		storage/parthelper.c
 
+UTIL_IO_HELPER_SOURCES =					\
+		util/iohelper.c
+
 # Network filters
 NWFILTER_DRIVER_SOURCES =					\
 		nwfilter/nwfilter_driver.h nwfilter/nwfilter_driver.c	\
@@ -1203,6 +1206,15 @@ EXTRA_DIST += $(LIBVIRT_QEMU_SYMBOL_FILE)
 
 libexec_PROGRAMS =
 
+libexec_PROGRAMS += libvirt_iohelper
+libvirt_iohelper_SOURCES = $(UTIL_IO_HELPER_SOURCES)
+libvirt_iohelper_LDFLAGS = $(WARN_LDFLAGS) $(AM_LDFLAGS)
+libvirt_iohelper_LDADD =		\
+		libvirt_util.la		\
+		../gnulib/lib/libgnu.la
+
+libvirt_iohelper_CFLAGS = $(AM_CFLAGS)
+
 if WITH_STORAGE_DISK
 if WITH_LIBVIRTD
 libexec_PROGRAMS += libvirt_parthelper
diff --git a/src/fdstream.c b/src/fdstream.c
index 701fafc..6d1ad95 100644
--- a/src/fdstream.c
+++ b/src/fdstream.c
@@ -34,10 +34,12 @@
 #include "fdstream.h"
 #include "virterror_internal.h"
 #include "datatypes.h"
+#include "logging.h"
 #include "memory.h"
 #include "event.h"
 #include "util.h"
 #include "files.h"
+#include "configmake.h"
 
 #define VIR_FROM_THIS VIR_FROM_STREAMS
 #define streamsReportError(code, ...)                                \
@@ -47,6 +49,10 @@
 /* Tunnelled migration stream support */
 struct virFDStreamData {
     int fd;
+    int errfd;
+    virCommandPtr cmd;
+    unsigned long long offset;
+    unsigned long long length;
 
     int watch;
     unsigned int cbRemoved;
@@ -206,6 +212,28 @@ static int virFDStreamFree(struct virFDStreamData *fdst)
 {
     int ret;
     ret = VIR_CLOSE(fdst->fd);
+    if (fdst->cmd) {
+        ssize_t len = 1024;
+        char buf[len];
+        int status;
+        if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0)
+            buf[0] = '\0';
+        else
+            buf[len] = '\0';
+
+        if (virCommandWait(fdst->cmd, &status) < 0) {
+            ret = -1;
+        } else if (status != 0) {
+            if (buf[0] == '\0')
+                streamsReportError(VIR_ERR_INTERNAL_ERROR,
+                                   _("I/O helper exited with status %d"), status);
+            else
+                streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                                   buf);
+            ret = -1;
+        }
+        virCommandFree(fdst->cmd);
+    }
     VIR_FREE(fdst);
     return ret;
 }
@@ -217,6 +245,8 @@ virFDStreamClose(virStreamPtr st)
     struct virFDStreamData *fdst = st->privateData;
     int ret;
 
+    VIR_DEBUG("st=%p", st);
+
     if (!fdst)
         return 0;
 
@@ -250,6 +280,18 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
 
     virMutexLock(&fdst->lock);
 
+    if (fdst->length) {
+        if (fdst->length == fdst->offset) {
+            virReportSystemError(ENOSPC, "%s",
+                                 _("cannot write to stream"));
+            virMutexUnlock(&fdst->lock);
+            return -1;
+        }
+
+        if ((fdst->length - fdst->offset) < nbytes)
+            nbytes = fdst->length - fdst->offset;
+    }
+
 retry:
     ret = write(fdst->fd, bytes, nbytes);
     if (ret < 0) {
@@ -262,6 +304,8 @@ retry:
             virReportSystemError(errno, "%s",
                                  _("cannot write to stream"));
         }
+    } else if (fdst->length) {
+        fdst->offset += ret;
     }
 
     virMutexUnlock(&fdst->lock);
@@ -288,6 +332,16 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
 
     virMutexLock(&fdst->lock);
 
+    if (fdst->length) {
+        if (fdst->length == fdst->offset) {
+            virMutexUnlock(&fdst->lock);
+            return 0;
+        }
+
+        if ((fdst->length - fdst->offset) < nbytes)
+            nbytes = fdst->length - fdst->offset;
+    }
+
 retry:
     ret = read(fdst->fd, bytes, nbytes);
     if (ret < 0) {
@@ -300,6 +354,8 @@ retry:
             virReportSystemError(errno, "%s",
                                  _("cannot read from stream"));
         }
+    } else if (fdst->length) {
+        fdst->offset += ret;
     }
 
     virMutexUnlock(&fdst->lock);
@@ -317,11 +373,17 @@ static virStreamDriver virFDStreamDrv = {
     .streamRemoveCallback = virFDStreamRemoveCallback
 };
 
-int virFDStreamOpen(virStreamPtr st,
-                    int fd)
+static int virFDStreamOpenInternal(virStreamPtr st,
+                                   int fd,
+                                   virCommandPtr cmd,
+                                   int errfd,
+                                   unsigned long long length)
 {
     struct virFDStreamData *fdst;
 
+    VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
+              st, fd, cmd, errfd, length);
+
     if ((st->flags & VIR_STREAM_NONBLOCK) &&
         virSetNonBlock(fd) < 0)
         return -1;
@@ -332,6 +394,9 @@ int virFDStreamOpen(virStreamPtr st,
     }
 
     fdst->fd = fd;
+    fdst->cmd = cmd;
+    fdst->errfd = errfd;
+    fdst->length = length;
     if (virMutexInit(&fdst->lock) < 0) {
         VIR_FREE(fdst);
         streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -346,6 +411,13 @@ int virFDStreamOpen(virStreamPtr st,
 }
 
 
+int virFDStreamOpen(virStreamPtr st,
+                    int fd)
+{
+    return virFDStreamOpenInternal(st, fd, NULL, -1, 0);
+}
+
+
 #if HAVE_SYS_UN_H
 int virFDStreamConnectUNIX(virStreamPtr st,
                            const char *path,
@@ -387,7 +459,7 @@ int virFDStreamConnectUNIX(virStreamPtr st,
         goto error;
     } while ((++i <= timeout*5) && (usleep(.2 * 1000000) <= 0));
 
-    if (virFDStreamOpen(st, fd) < 0)
+    if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0)
         goto error;
     return 0;
 
@@ -406,19 +478,28 @@ int virFDStreamConnectUNIX(virStreamPtr st ATTRIBUTE_UNUSED,
 }
 #endif
 
-int virFDStreamOpenFile(virStreamPtr st,
-                        const char *path,
-                        int flags)
+static int
+virFDStreamOpenFileInternal(virStreamPtr st,
+                            const char *path,
+                            unsigned long long offset,
+                            unsigned long long length,
+                            int flags,
+                            int mode)
 {
-    int fd;
+    int fd = -1;
+    int fds[2] = { -1, -1 };
     struct stat sb;
+    virCommandPtr cmd = NULL;
+    int errfd = -1;
 
-    if (flags & O_CREAT) {
-        streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                           _("Unexpected O_CREAT flag when opening existing file"));
-    }
+    VIR_DEBUG("st=%p path=%s flags=%d offset=%llu length=%llu mode=%d",
+              st, path, flags, offset, length, mode);
 
-    if ((fd  = open(path, flags)) < 0) {
+    if (flags & O_CREAT)
+        fd = open(path, flags, mode);
+    else
+        fd = open(path, flags);
+    if (fd < 0) {
         virReportSystemError(errno,
                              _("Unable to open stream for '%s'"),
                              path);
@@ -440,64 +521,93 @@ int virFDStreamOpenFile(virStreamPtr st,
     if ((st->flags & VIR_STREAM_NONBLOCK) &&
         (!S_ISCHR(sb.st_mode) &&
          !S_ISFIFO(sb.st_mode))) {
-        streamsReportError(VIR_ERR_INTERNAL_ERROR,
-                           _("Non-blocking I/O is not supported on %s"),
-                           path);
-        goto error;
-    }
+        int childfd;
 
-    if (virFDStreamOpen(st, fd) < 0)
-        goto error;
+        if ((flags & O_RDWR) == O_RDWR) {
+            streamsReportError(VIR_ERR_INTERNAL_ERROR,
+                               _("%s: Cannot request read and write flags together"),
+                               path);
+            goto error;
+        }
 
-    return 0;
+        VIR_FORCE_CLOSE(fd);
+        if (pipe(fds) < 0) {
+            virReportSystemError(errno, "%s",
+                                 _("Unable to create pipe"));
+            goto error;
+        }
 
-error:
-    VIR_FORCE_CLOSE(fd);
-    return -1;
-}
+        cmd = virCommandNewArgList(LIBEXECDIR "/libvirt_iohelper",
+                                   path,
+                                   NULL);
+        virCommandAddArgFormat(cmd, "%d", flags);
+        virCommandAddArgFormat(cmd, "%d", mode);
+        virCommandAddArgFormat(cmd, "%llu", offset);
+        virCommandAddArgFormat(cmd, "%llu", length);
 
-int virFDStreamCreateFile(virStreamPtr st,
-                          const char *path,
-                          int flags,
-                          mode_t mode)
-{
-    int fd = open(path, flags, mode);
-    struct stat sb;
+        if (!cmd)
+            goto error;
 
-    if (fd < 0) {
-        virReportSystemError(errno,
-                             _("Unable to open stream for '%s'"),
-                             path);
-        return -1;
-    }
+        //virCommandDaemonize(cmd);
+        if (flags == O_RDONLY) {
+            childfd = fds[1];
+            fd = fds[0];
+            virCommandSetOutputFD(cmd, &childfd);
+        } else {
+            childfd = fds[0];
+            fd = fds[1];
+            virCommandSetInputFD(cmd, childfd);
+        }
+        virCommandSetErrorFD(cmd, &errfd);
 
-    if (fstat(fd, &sb) < 0) {
-        virReportSystemError(errno,
-                             _("Unable to access stream for '%s'"),
-                             path);
-        goto error;
-    }
+        if (virCommandRunAsync(cmd, NULL) < 0)
+            goto error;
 
-    /* Thanks to the POSIX i/o model, we can't reliably get
-     * non-blocking I/O on block devs/regular files. To
-     * support those we need to fork a helper process todo
-     * the I/O so we just have a fifo. Or use AIO :-(
-     */
-    if ((st->flags & VIR_STREAM_NONBLOCK) &&
-        (!S_ISCHR(sb.st_mode) &&
-         !S_ISFIFO(sb.st_mode))) {
-        streamsReportError(VIR_ERR_INTERNAL_ERROR,
-                           _("Non-blocking I/O is not supported on %s"),
-                           path);
-        goto error;
+        VIR_FORCE_CLOSE(childfd);
+    } else {
+        if (offset &&
+            lseek(fd, offset, SEEK_SET) != offset) {
+                virReportSystemError(errno,
+                                     _("Unable to seek %s to %llu"),
+                                     path, offset);
+                goto error;
+        }
     }
 
-    if (virFDStreamOpen(st, fd) < 0)
+    if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0)
         goto error;
 
     return 0;
 
 error:
+    virCommandFree(cmd);
+    VIR_FORCE_CLOSE(fds[0]);
+    VIR_FORCE_CLOSE(fds[1]);
     VIR_FORCE_CLOSE(fd);
     return -1;
 }
+
+int virFDStreamOpenFile(virStreamPtr st,
+                        const char *path,
+                        unsigned long long offset,
+                        unsigned long long length,
+                        int flags)
+{
+    if (flags & O_CREAT) {
+        streamsReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Attempt to create %s without specifying mode"),
+                           path);
+        return -1;
+    }
+    return virFDStreamOpenFileInternal(st, path, offset, length, flags, 0);
+}
+
+int virFDStreamCreateFile(virStreamPtr st,
+                          const char *path,
+                          unsigned long long offset,
+                          unsigned long long length,
+                          int flags,
+                          mode_t mode)
+{
+    return virFDStreamOpenFileInternal(st, path, offset, length, flags, mode);
+}
diff --git a/src/fdstream.h b/src/fdstream.h
index 53cbaa7..6b395b6 100644
--- a/src/fdstream.h
+++ b/src/fdstream.h
@@ -24,6 +24,7 @@
 # define __VIR_FDSTREAM_H_
 
 # include "internal.h"
+# include "command.h"
 
 int virFDStreamOpen(virStreamPtr st,
                     int fd);
@@ -34,9 +35,13 @@ int virFDStreamConnectUNIX(virStreamPtr st,
 
 int virFDStreamOpenFile(virStreamPtr st,
                         const char *path,
+                        unsigned long long offset,
+                        unsigned long long length,
                         int flags);
 int virFDStreamCreateFile(virStreamPtr st,
                           const char *path,
+                          unsigned long long offset,
+                          unsigned long long length,
                           int flags,
                           mode_t mode);
 
diff --git a/src/lxc/lxc_driver.c b/src/lxc/lxc_driver.c
index 60d4204..815c5f6 100644
--- a/src/lxc/lxc_driver.c
+++ b/src/lxc/lxc_driver.c
@@ -2776,7 +2776,7 @@ lxcDomainOpenConsole(virDomainPtr dom,
         goto cleanup;
     }
 
-    if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
+    if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
         goto cleanup;
 
     ret = 0;
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index c7d4262..2cb36b3 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -7015,7 +7015,7 @@ qemuDomainOpenConsole(virDomainPtr dom,
         goto cleanup;
     }
 
-    if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
+    if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
         goto cleanup;
 
     ret = 0;
diff --git a/src/uml/uml_driver.c b/src/uml/uml_driver.c
index 538d5f7..d364597 100644
--- a/src/uml/uml_driver.c
+++ b/src/uml/uml_driver.c
@@ -2126,7 +2126,7 @@ umlDomainOpenConsole(virDomainPtr dom,
         goto cleanup;
     }
 
-    if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
+    if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
         goto cleanup;
 
     ret = 0;
diff --git a/src/util/iohelper.c b/src/util/iohelper.c
new file mode 100644
index 0000000..07aef34
--- /dev/null
+++ b/src/util/iohelper.c
@@ -0,0 +1,208 @@
+/*
+ * iohelper.c: Helper program to perform I/O operations on files
+ *
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ *
+ * Current support
+ *   - Read existing file
+ *   - Write existing file
+ *   - Create & write new file
+ */
+
+#include <config.h>
+
+#include <locale.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "util.h"
+#include "threads.h"
+#include "files.h"
+#include "memory.h"
+#include "virterror_internal.h"
+#include "configmake.h"
+
+#define VIR_FROM_THIS VIR_FROM_STORAGE
+
+static int runIO(const char *path,
+                 int flags,
+                 int mode,
+                 unsigned long long offset,
+                 unsigned long long length)
+{
+    char *buf = NULL;
+    size_t buflen = 1024*1024;
+    int fd;
+    int ret = -1;
+    int fdin, fdout;
+    const char *fdinname, *fdoutname;
+
+    if (flags & O_CREAT) {
+        fd = open(path, flags, mode);
+    } else {
+        fd = open(path, flags);
+    }
+    if (fd < 0) {
+        virReportSystemError(errno, _("Unable to open %s"), path);
+        goto cleanup;
+    }
+
+    if (offset) {
+        if (lseek(fd, offset, SEEK_SET) < 0) {
+            virReportSystemError(errno, _("Unable to seek %s to %llu"),
+                                 path, offset);
+            goto cleanup;
+        }
+    }
+
+    if (VIR_ALLOC_N(buf, buflen) < 0) {
+        virReportOOMError();
+        goto cleanup;
+    }
+
+    switch (flags & O_ACCMODE) {
+    case O_RDONLY:
+        fdin = fd;
+        fdinname = path;
+        fdout = STDOUT_FILENO;
+        fdoutname = "stdout";
+        break;
+    case O_WRONLY:
+        fdin = STDIN_FILENO;
+        fdinname = "stdin";
+        fdout = fd;
+        fdoutname = path;
+        break;
+
+    case O_RDWR:
+    default:
+        virReportSystemError(EINVAL,
+                             _("Unable to process file with flags %d"),
+                             (flags & O_ACCMODE));
+        goto cleanup;
+    }
+
+    offset = 0;
+    while (1) {
+        ssize_t got;
+
+        if (length &&
+            (length - offset) < buflen)
+            buflen = length - offset;
+
+        if (buflen == 0)
+            break;
+
+        if ((got = saferead(fdin, buf, buflen)) < 0) {
+            virReportSystemError(errno, _("Unable to read %s"), fdinname);
+            goto cleanup;
+        }
+        if (got == 0)
+            break;
+        offset += got;
+        if (safewrite(fdout, buf, got) < 0) {
+            virReportSystemError(errno, _("Unable to write %s"), fdoutname);
+            goto cleanup;
+        }
+    }
+
+    ret = 0;
+
+cleanup:
+    if (VIR_CLOSE(fd) < 0 &&
+        ret == 0) {
+        virReportSystemError(errno, _("Unable to close %s"), path);
+        ret = -1;
+    }
+
+    VIR_FREE(buf);
+    return ret;
+}
+
+int main(int argc, char **argv)
+{
+    const char *path;
+    const char *op;
+    virErrorPtr err;
+    unsigned long long offset;
+    unsigned long long length;
+    int flags;
+    int mode;
+
+    if (setlocale(LC_ALL, "") == NULL ||
+        bindtextdomain(PACKAGE, LOCALEDIR) == NULL ||
+        textdomain(PACKAGE) == NULL) {
+        fprintf(stderr, _("%s: initialization failed\n"), argv[0]);
+        exit(EXIT_FAILURE);
+    }
+
+    if (virThreadInitialize() < 0 ||
+        virErrorInitialize() < 0 ||
+        virRandomInitialize(time(NULL) ^ getpid())) {
+        fprintf(stderr, _("%s: initialization failed\n"), argv[0]);
+        exit(EXIT_FAILURE);
+    }
+
+    if (argc != 6) {
+        fprintf(stderr, _("%s: syntax FILENAME FLAGS MODE OFFSET LENGTH\n"), argv[0]);
+        exit(EXIT_FAILURE);
+    }
+
+    path = argv[1];
+    op = argv[2];
+
+    if (virStrToLong_i(argv[2], NULL, 10, &flags) < 0) {
+        fprintf(stderr, _("%s: malformed file flags %s"), argv[0], argv[2]);
+        exit(EXIT_FAILURE);
+    }
+
+    if (virStrToLong_i(argv[3], NULL, 10, &mode) < 0) {
+        fprintf(stderr, _("%s: malformed file mode %s"), argv[0], argv[3]);
+        exit(EXIT_FAILURE);
+    }
+
+    if (virStrToLong_ull(argv[4], NULL, 10, &offset) < 0) {
+        fprintf(stderr, _("%s: malformed file offset %s"), argv[0], argv[4]);
+        exit(EXIT_FAILURE);
+    }
+    if (virStrToLong_ull(argv[5], NULL, 10, &length) < 0) {
+        fprintf(stderr, _("%s: malformed file length %s"), argv[0], argv[5]);
+        exit(EXIT_FAILURE);
+    }
+
+    if ((flags & O_RDWR) == O_RDWR) {
+        exit(EXIT_FAILURE);
+    }
+
+    if (runIO(path, flags, mode, offset, length) < 0)
+        goto error;
+
+    return 0;
+
+error:
+    err = virGetLastError();
+    if (err) {
+        fprintf(stderr, "%s: %s\n", argv[0], err->message);
+    } else {
+        fprintf(stderr, _("%s: unknown failure with %s\n"), argv[0], path);
+    }
+    exit(EXIT_FAILURE);
+}
diff --git a/src/xen/xen_driver.c b/src/xen/xen_driver.c
index 1162f63..de304c1 100644
--- a/src/xen/xen_driver.c
+++ b/src/xen/xen_driver.c
@@ -2019,7 +2019,7 @@ xenUnifiedDomainOpenConsole(virDomainPtr dom,
         goto cleanup;
     }
 
-    if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
+    if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
         goto cleanup;
 
     ret = 0;
-- 
1.7.4




More information about the libvir-list mailing list