[libvirt] [PATCH 34/38] fdstream: Implement sparse stream

Michal Privoznik mprivozn at redhat.com
Thu Apr 13 13:31:42 UTC 2017


Implement virStreamSkip and virStreamInData callbacks. These
callbacks do no magic, just skip a hole or detect whether we are
in a data section of a file or in a hole and how much bytes can
we read until section changes.

Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
---
 src/storage/storage_util.c |   4 +-
 src/util/virfdstream.c     | 234 +++++++++++++++++++++++++++++++++++++++++----
 src/util/virfdstream.h     |   1 +
 3 files changed, 216 insertions(+), 23 deletions(-)

diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c
index a2d89af..3576435 100644
--- a/src/storage/storage_util.c
+++ b/src/storage/storage_util.c
@@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED,
     /* Not using O_CREAT because the file is required to already exist at
      * this point */
     ret = virFDStreamOpenBlockDevice(stream, target_path,
-                                     offset, len, O_WRONLY);
+                                     offset, len, false, O_WRONLY);
 
  cleanup:
     VIR_FREE(path);
@@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED,
     }
 
     ret = virFDStreamOpenBlockDevice(stream, target_path,
-                                     offset, len, O_RDONLY);
+                                     offset, len, false, O_RDONLY);
 
  cleanup:
     VIR_FREE(path);
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index efd9199..e9b5962 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
@@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream");
 
 typedef enum {
     VIR_FDSTREAM_MSG_TYPE_DATA,
+    VIR_FDSTREAM_MSG_TYPE_SKIP,
 } virFDStreamMsgType;
 
 typedef struct _virFDStreamMsg virFDStreamMsg;
@@ -66,6 +67,9 @@ struct _virFDStreamMsg {
             size_t len;
             size_t offset;
         } data;
+        struct {
+            size_t len;
+        } skip;
     } stream;
 };
 
@@ -175,6 +179,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg)
     case VIR_FDSTREAM_MSG_TYPE_DATA:
         VIR_FREE(msg->stream.data.buf);
         break;
+    case VIR_FDSTREAM_MSG_TYPE_SKIP:
+        /* nada */
+        break;
     }
 
     VIR_FREE(msg);
@@ -361,6 +368,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
 struct _virFDStreamThreadData {
     virStreamPtr st;
     size_t length;
+    bool sparse;
     int fdin;
     char *fdinname;
     int fdout;
@@ -383,32 +391,66 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
 
 static ssize_t
 virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+                        bool sparse,
                         const int fdin,
                         const char *fdinname,
+                        size_t *dataLen,
                         size_t buflen)
 {
     virFDStreamMsgPtr msg = NULL;
+    int inData = 0;
+    unsigned long long sectionLen = 0;
     char *buf = NULL;
     ssize_t got;
 
+    if (sparse && *dataLen == 0) {
+        if (virFileInData(fdin, &inData, &sectionLen) < 0)
+            goto error;
+
+        if (inData)
+            *dataLen = sectionLen;
+    }
+
     if (VIR_ALLOC(msg) < 0)
         goto error;
 
-    if (VIR_ALLOC_N(buf, buflen) < 0)
-        goto error;
-
-    if ((got = saferead(fdin, buf, buflen)) < 0) {
-        virReportSystemError(errno,
-                             _("Unable to read %s"),
-                             fdinname);
-        goto error;
+    if (sparse && *dataLen == 0) {
+        msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP;
+        msg->stream.skip.len = sectionLen;
+        got = sectionLen;
+
+        /* HACK. The message queue is one directional. So caller
+         * cannot make us skip the hole. Do that for them instead. */
+        if (sectionLen &&
+            lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
+            virReportSystemError(errno,
+                                 _("unable to seek in %s"),
+                                 fdinname);
+            goto error;
+        }
+    } else {
+        if (sparse &&
+            buflen > *dataLen)
+            buflen = *dataLen;
+
+        if (VIR_ALLOC_N(buf, buflen) < 0)
+            goto error;
+
+        if ((got = saferead(fdin, buf, buflen)) < 0) {
+            virReportSystemError(errno,
+                                 _("Unable to read %s"),
+                                 fdinname);
+            goto error;
+        }
+
+        msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+        msg->stream.data.buf = buf;
+        msg->stream.data.len = got;
+        buf = NULL;
+        if (sparse)
+            *dataLen -= got;
     }
 
-    msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
-    msg->stream.data.buf = buf;
-    msg->stream.data.len = got;
-    buf = NULL;
-
     virFDStreamMsgQueuePush(fdst, msg);
     msg = NULL;
 
@@ -423,11 +465,13 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
 
 static ssize_t
 virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
+                         bool sparse,
                          const int fdout,
                          const char *fdoutname)
 {
     ssize_t got;
     virFDStreamMsgPtr msg = fdst->msg;
+    off_t off;
     bool pop = false;
 
     switch (msg->type) {
@@ -446,6 +490,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
 
         pop = msg->stream.data.offset == msg->stream.data.len;
         break;
+
+    case VIR_FDSTREAM_MSG_TYPE_SKIP:
+        if (!sparse) {
+            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                           _("unexpected stream skip"));
+            return -1;
+        }
+
+        got = msg->stream.skip.len;
+        off = lseek(fdout, got, SEEK_CUR);
+        if (off == (off_t) -1) {
+            virReportSystemError(errno,
+                                 _("unable to seek in %s"),
+                                 fdoutname);
+            return -1;
+        }
+
+        if (ftruncate(fdout, off) < 0) {
+            virReportSystemError(errno,
+                                 _("unable to truncate %s"),
+                                 fdoutname);
+            return -1;
+        }
+
+        pop = true;
+        break;
     }
 
     if (pop) {
@@ -463,6 +533,7 @@ virFDStreamThread(void *opaque)
     virFDStreamThreadDataPtr data = opaque;
     virStreamPtr st = data->st;
     size_t length = data->length;
+    bool sparse = data->sparse;
     int fdin = data->fdin;
     char *fdinname = data->fdinname;
     int fdout = data->fdout;
@@ -471,6 +542,7 @@ virFDStreamThread(void *opaque)
     bool doRead = fdst->threadDoRead;
     size_t buflen = 256 * 1024;
     size_t total = 0;
+    size_t dataLen = 0;
 
     virObjectRef(fdst);
     virObjectLock(fdst);
@@ -505,9 +577,9 @@ virFDStreamThread(void *opaque)
         }
 
         if (doRead)
-            got = virFDStreamThreadDoRead(fdst, fdin, fdinname, buflen);
+            got = virFDStreamThreadDoRead(fdst, sparse, fdin, fdinname, &dataLen, buflen);
         else
-            got = virFDStreamThreadDoWrite(fdst, fdout, fdoutname);
+            got = virFDStreamThreadDoWrite(fdst, sparse, fdout, fdoutname);
 
         if (got < 0)
             goto error;
@@ -773,6 +845,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
             }
         }
 
+        /* Shortcut, if the stream is in the trailing hole,
+         * return 0 immediately. */
+        if (msg->type == VIR_FDSTREAM_MSG_TYPE_SKIP &&
+            msg->stream.skip.len == 0) {
+            ret = 0;
+            goto cleanup;
+        }
+
         if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
             /* Nope, nope, I'm outta here */
             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -823,11 +903,120 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
 }
 
 
+static int
+virFDStreamSkip(virStreamPtr st,
+                unsigned long long length)
+{
+    virFDStreamDataPtr fdst = st->privateData;
+    virFDStreamMsgPtr msg = NULL;
+    off_t off;
+    int ret = -1;
+
+    virObjectLock(fdst);
+    if (fdst->length) {
+        if (length > fdst->length - fdst->offset)
+            length = fdst->length - fdst->offset;
+        fdst->offset += length;
+    }
+
+    if (fdst->thread) {
+        /* Things are a bit complicated here. But bear with me. If FDStream is
+         * in a read mode, then if the message at the queue head is SKIP, just
+         * pop it. The thread has lseek()-ed anyway. If however, the FDStream
+         * is in write mode, then tell the thread to do the lseek() for us.
+         * Under no circumstances we can do the lseek() ourselves here. We
+         * might mess up file position for the thread. */
+        if (fdst->threadDoRead) {
+            msg = fdst->msg;
+            if (msg->type != VIR_FDSTREAM_MSG_TYPE_SKIP) {
+                virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                               _("Invalid stream skip"));
+                goto cleanup;
+            }
+
+            virFDStreamMsgQueuePop(fdst);
+        } else {
+            if (VIR_ALLOC(msg) < 0)
+                goto cleanup;
+
+            msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP;
+            msg->stream.skip.len = length;
+            virFDStreamMsgQueuePush(fdst, msg);
+            msg = NULL;
+        }
+    } else {
+        off = lseek(fdst->fd, length, SEEK_CUR);
+        if (off == (off_t) -1) {
+            virReportSystemError(errno, "%s",
+                                 _("unable to seek"));
+            goto cleanup;
+        }
+
+        if (ftruncate(fdst->fd, off) < 0) {
+            virReportSystemError(errno, "%s",
+                                 _("unable to truncate"));
+            goto cleanup;
+        }
+    }
+
+    ret = 0;
+ cleanup:
+    virObjectUnlock(fdst);
+    virFDStreamMsgFree(msg);
+    return ret;
+}
+
+
+static int
+virFDStreamInData(virStreamPtr st,
+                  int *inData,
+                  unsigned long long *length)
+{
+    virFDStreamDataPtr fdst = st->privateData;
+    int ret = -1;
+
+    virObjectLock(fdst);
+
+    if (fdst->thread) {
+        virFDStreamMsgPtr msg;
+
+        while (!(msg = fdst->msg)) {
+            if (fdst->threadQuit) {
+                *inData = *length = 0;
+                ret = 0;
+                goto cleanup;
+            } else {
+                virObjectUnlock(fdst);
+                virCondSignal(&fdst->threadCond);
+                virObjectLock(fdst);
+            }
+        }
+
+        if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) {
+            *inData = 1;
+            *length = msg->stream.data.len - msg->stream.data.offset;
+        } else {
+            *inData = 0;
+            *length = msg->stream.skip.len;
+        }
+        ret = 0;
+    } else {
+        ret = virFileInData(fdst->fd, inData, length);
+    }
+
+ cleanup:
+    virObjectUnlock(fdst);
+    return ret;
+}
+
+
 static virStreamDriver virFDStreamDrv = {
     .streamSend = virFDStreamWrite,
     .streamRecv = virFDStreamRead,
     .streamFinish = virFDStreamClose,
     .streamAbort = virFDStreamAbort,
+    .streamSkip = virFDStreamSkip,
+    .streamInData = virFDStreamInData,
     .streamEventAddCallback = virFDStreamAddCallback,
     .streamEventUpdateCallback = virFDStreamUpdateCallback,
     .streamEventRemoveCallback = virFDStreamRemoveCallback
@@ -969,7 +1158,8 @@ virFDStreamOpenFileInternal(virStreamPtr st,
                             unsigned long long length,
                             int oflags,
                             int mode,
-                            bool forceIOHelper)
+                            bool forceIOHelper,
+                            bool sparse)
 {
     int fd = -1;
     struct stat sb;
@@ -1026,6 +1216,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
 
         threadData->st = virObjectRef(st);
         threadData->length = length;
+        threadData->sparse = sparse;
 
         if ((oflags & O_ACCMODE) == O_RDONLY) {
             threadData->fdin = fd;
@@ -1067,7 +1258,7 @@ int virFDStreamOpenFile(virStreamPtr st,
     }
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
-                                       oflags, 0, false);
+                                       oflags, 0, false, false);
 }
 
 int virFDStreamCreateFile(virStreamPtr st,
@@ -1080,7 +1271,7 @@ int virFDStreamCreateFile(virStreamPtr st,
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
                                        oflags | O_CREAT, mode,
-                                       false);
+                                       false, false);
 }
 
 #ifdef HAVE_CFMAKERAW
@@ -1096,7 +1287,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
     if (virFDStreamOpenFileInternal(st, path,
                                     offset, length,
                                     oflags | O_CREAT, 0,
-                                    false) < 0)
+                                    false, false) < 0)
         return -1;
 
     fdst = st->privateData;
@@ -1133,7 +1324,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
                                        oflags | O_CREAT, 0,
-                                       false);
+                                       false, false);
 }
 #endif /* !HAVE_CFMAKERAW */
 
@@ -1141,11 +1332,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
                                const char *path,
                                unsigned long long offset,
                                unsigned long long length,
+                               bool sparse,
                                int oflags)
 {
     return virFDStreamOpenFileInternal(st, path,
                                        offset, length,
-                                       oflags, 0, true);
+                                       oflags, 0, true, sparse);
 }
 
 int virFDStreamSetInternalCloseCb(virStreamPtr st,
diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h
index 34c4c3f..887c991 100644
--- a/src/util/virfdstream.h
+++ b/src/util/virfdstream.h
@@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
                                const char *path,
                                unsigned long long offset,
                                unsigned long long length,
+                               bool sparse,
                                int oflags);
 
 int virFDStreamSetInternalCloseCb(virStreamPtr st,
-- 
2.10.2




More information about the libvir-list mailing list