[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Libguestfs] [PATCH nbdkit] streaming plugin: Add support for a sliding window



[Sorry, can't use git-send-email at the moment ...]

These patches implement a sliding window for the streaming plugin[1]
in nbdkit.

I would like to be able to stream a filesystem from tools such as
'virt-make-fs'[2].  This is a fairly frequently requested feature.

Unfortunately:

(a) The patches make the code significantly more complex and therefore
likely to have bugs.

(b) They are not practically useful.  'parted' likes to write to the
beginning and end of a disk, even when creating a simple MBR, and of
course 'mkfs' scribbles the group headers across the whole disk when
creating a filesystem.

A simple window approach is obviously not sufficient.  A better
approach might be something like a sparse, size-limited map recording
writes at any point in the disk.  But that has the problem that you
don't know when you can commit a write to the stream -- some heuristic
would have to be used.

I'm posting them to the mailing list for the record and in case anyone
has any better ideas.

Rich.

[1] http://rwmj.wordpress.com/2014/10/14/streaming-nbd-server/#content
[2] http://libguestfs.org/virt-make-fs.1.html

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
Fedora Windows cross-compiler. Compile Windows programs, test, and
build Windows installers. Over 100 libraries supported.
http://fedoraproject.org/wiki/MinGW
>From be039f70da0c3ece9075724bf5ff29a45038dce5 Mon Sep 17 00:00:00 2001
From: "Richard W.M. Jones" <rjones redhat com>
Date: Tue, 14 Oct 2014 16:38:50 +0200
Subject: [PATCH 1/2] streaming: Implement sliding window and add window=SIZE
 parameter.

---
 plugins/streaming/nbdkit-streaming-plugin.pod |  17 +-
 plugins/streaming/streaming.c                 | 290 +++++++++++++++++++++-----
 2 files changed, 256 insertions(+), 51 deletions(-)

diff --git a/plugins/streaming/nbdkit-streaming-plugin.pod b/plugins/streaming/nbdkit-streaming-plugin.pod
index a21ed4f..635af69 100644
--- a/plugins/streaming/nbdkit-streaming-plugin.pod
+++ b/plugins/streaming/nbdkit-streaming-plugin.pod
@@ -6,7 +6,7 @@ nbdkit-streaming-plugin - nbdkit streaming plugin
 
 =head1 SYNOPSIS
 
- nbdkit streaming pipe=FILENAME [size=SIZE]
+ nbdkit streaming pipe=FILENAME [size=SIZE] [window=SIZE]
 
 =head1 DESCRIPTION
 
@@ -50,12 +50,23 @@ Whether you need to specify this parameter depends on the client.
 Some clients don't check the size and just write/stream, others do
 checks or calculations based on the apparent size.
 
+=item B<window=SIZE>
+
+Specify a sliding window of data, allowing limited seeking backwards
+and reads.  You can use any size specifier permitted by
+C<nbdkit_parse_size>, eg. C<window=1M>.
+
+Note that this is disabled (set to 0) by default, since enabling it
+causes writes to be delayed until the client moves the window forward
+or until nbdkit exits.
+
 =back
 
 =head1 TO DO
 
-This plugin would be much nicer if it supported the concept of a
-"window" of data, allowing limited reverse seeks and reads.
+Separate read and write windows would make more sense, allowing a
+large read window and a small write window.  The smaller (or zero)
+write window would mean that writes are not delayed.
 
 =head1 SEE ALSO
 
diff --git a/plugins/streaming/streaming.c b/plugins/streaming/streaming.c
index f58fa46..2d08803 100644
--- a/plugins/streaming/streaming.c
+++ b/plugins/streaming/streaming.c
@@ -41,20 +41,28 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <errno.h>
+#include <assert.h>
 
 #include <nbdkit-plugin.h>
 
+#define min(a,b) ((a)<(b)?(a):(b))
+
 static char *filename = NULL;
 static int fd = -1;
 
 /* In theory INT64_MAX, but it breaks qemu's NBD driver. */
 static int64_t size = INT64_MAX/2;
 
-/* Flag if we have entered the unrecoverable error state because of
- * a seek backwards.
+/* Flag if we have entered the unrecoverable error state because of a
+ * seek backwards beyond the window.
  */
 static int errorstate = 0;
 
+/* Window. */
+static int64_t window_max_size = 0; /* window= parameter */
+static int64_t window_size = 0;     /* current size */
+static char *window = NULL;
+
 /* Highest byte (+1) that has been written in the data stream. */
 static uint64_t highestwrite = 0;
 
@@ -73,6 +81,11 @@ streaming_config (const char *key, const char *value)
     if (size == -1)
       return -1;
   }
+  else if (strcmp (key, "window") == 0) {
+    window_max_size = nbdkit_parse_size (value);
+    if (window_max_size == -1)
+      return -1;
+  }
   else {
     nbdkit_error ("unknown parameter '%s'", key);
     return -1;
@@ -110,18 +123,10 @@ streaming_config_complete (void)
   return 0;
 }
 
-/* nbdkit is shutting down. */
-static void
-streaming_unload (void)
-{
-  if (fd >= 0)
-    close (fd);
-  free (filename);
-}
-
 #define streaming_config_help \
   "pipe=<FILENAME>     (required) The filename to serve.\n" \
-  "size=<SIZE>         (optional) Stream size."
+  "size=<SIZE>         (optional) Stream size.\n" \
+  "window=<SIZE>       (optional) Window size."
 
 /* Create the per-connection handle. */
 static void *
@@ -160,13 +165,66 @@ streaming_get_size (void *handle)
   return size;
 }
 
+static int
+xwrite (int fd, const char *buf, size_t n)
+{
+  ssize_t r;
+
+  while (n > 0) {
+    r = write (fd, buf, n);
+    if (r == -1) {
+      nbdkit_error ("write: %m");
+      return -1;
+    }
+    buf += r;
+    n -= r;
+  }
+  return 0;
+}
+
+static int
+xwrite_zeroes (int fd, size_t n)
+{
+  ssize_t r;
+  char buf[4096];
+
+  memset (buf, 0, sizeof buf);
+
+  while (n > 0) {
+    r = write (fd, buf, min (n, sizeof buf));
+    if (r == -1) {
+      nbdkit_error ("write: %m");
+      return -1;
+    }
+    n -= r;
+  }
+  return 0;
+}
+
+/*
+This diagram should help when trying to understand the pread and
+pwrite calls below.
+
+Note that we recursively split read and write calls to make the cases
+tractable.
+
+                           |<------- window_max_size ------->|
+                           |<---- window_size ----->|
+  +------------------------+------------------------+--------+----------
+  ^                        ^                        ^        ^
+  0                    windowstart         highestwrite   maxwindow
+
+ */
+
 /* Write data to the stream. */
 static int
 streaming_pwrite (void *handle, const void *buf,
                   uint32_t count, uint64_t offset)
 {
-  size_t n;
-  ssize_t r;
+  int r;
+  uint64_t windowstart;
+  uint64_t maxwindow;
+  int64_t delta;
 
   if (errorstate) {
     nbdkit_error ("unrecoverable error state");
@@ -174,63 +232,199 @@ streaming_pwrite (void *handle, const void *buf,
     return -1;
   }
 
-  if (offset < highestwrite) {
-    nbdkit_error ("client tried to seek backwards and write: the streaming plugin does not currently support this");
+  /* This just makes the recursive case easier to reason about. */
+  if (count == 0)
+    return 0;
+
+  windowstart = highestwrite - window_size;
+
+  if (offset < windowstart) {
+    nbdkit_error ("client seeked backwards > window size: you must increase the window size");
     errorstate = 1;
     errno = EIO;
     return -1;
   }
 
-  /* Need to write some zeroes. */
-  if (offset > highestwrite) {
-    int64_t size = offset - highestwrite;
-    char buf[4096];
-
-    memset (buf, 0, sizeof buf);
-
-    while (size > 0) {
-      n = size > sizeof buf ? sizeof buf : size;
-      r = write (fd, buf, n);
-      if (r == -1) {
-        nbdkit_error ("write: %m");
-        errorstate = 1;
-        return -1;
-      }
-      highestwrite += r;
-      size -= r;
-    }
+  /* Split writes across highestwrite and maxwindow boundaries.
+   * Splitting here means we do not have to deal with writes across
+   * the boundary in the code below.
+   */
+  if (offset < highestwrite && offset + count > highestwrite) {
+    uint64_t size = highestwrite - offset;
+
+    r = streaming_pwrite (handle, buf, size, offset);
+    if (r == -1)
+      return -1;
+    buf += size;
+    offset += size;
+    count -= size;
+    return streaming_pwrite (handle, buf, count, offset);
+  }
+
+  maxwindow = windowstart + window_max_size;
+
+  if (offset < maxwindow && offset + count > maxwindow) {
+    uint64_t size = maxwindow - offset;
+
+    r = streaming_pwrite (handle, buf, size, offset);
+    if (r == -1)
+      return -1;
+    buf += size;
+    offset += size;
+    count -= size;
+    return streaming_pwrite (handle, buf, count, offset);
+  }
+
+  /* Handle a write entirely within the current window. */
+  if (offset < highestwrite) {
+    uint64_t windowoffset = window_size - (highestwrite - offset);
+    memcpy (&window[windowoffset], buf, count);
+    return 0;
   }
 
-  /* Write the data. */
-  while (count > 0) {
-    r = write (fd, buf, count);
-    if (r == -1) {
-      nbdkit_error ("write: %m");
-      errorstate = 1;
+  /* A write after highestwrite but not larger than maxwindow causes
+   * the window to be extended but not moved.
+   */
+  if (offset < maxwindow) {
+    uint64_t new_highestwrite = offset + count;
+    uint64_t new_size = new_highestwrite - windowstart;
+    char *new_window;
+
+    new_window = realloc (window, new_size);
+    if (new_window == NULL) {
+      nbdkit_error ("realloc: %m");
       return -1;
     }
-    buf += r;
-    highestwrite += r;
-    count -= r;
+    window = new_window;
+    /* Make sure the extended window is zeroes to start with. */
+    memset (&window[window_size], 0, new_size - window_size);
+    highestwrite = new_highestwrite;
+    /* Copy the buffer to the new window. */
+    memcpy (&window[offset - windowstart], buf, count);
+    return 0;
+  }
+
+  /* Split writes after maxwindow at highestwrite + window_max_size. */
+  if (offset < highestwrite + window_max_size &&
+      offset + count > highestwrite + window_max_size) {
+    uint64_t size = highestwrite + window_max_size - offset;
+
+    r = streaming_pwrite (handle, buf, size, offset);
+    if (r == -1)
+      return -1;
+    buf += size;
+    offset += size;
+    count -= size;
+    return streaming_pwrite (handle, buf, count, offset);
+  }
+
+  /* Any write here is going to cause the window to move.  Delta is
+   * the amount by which the window will move (NB: might be greater
+   * than the window size).
+   */
+  delta = offset + count - highestwrite;
+
+  if (delta <= window_size) {
+    /* Write out the oldest part of the window. */
+    if (xwrite (fd, window, delta) == -1)
+      return -1;
+
+    /* Move the data in the window down. */
+    memmove (window, window + delta, window_size - delta);
+
+    /* Copy the buffer to the new window. */
+    memcpy (window + window_size - count, buf, count);
+    highestwrite += delta;
+    return 0;
   }
 
-  return 0;
+  /* The window will move by more than a single window size.  Write out
+   * the whole of the old window, then write zeroes, then continue the
+   * write.
+   */
+  if (xwrite (fd, window, window_size) == -1)
+    return -1;
+  memset (window, 0, window_size);
+
+  if (xwrite_zeroes (fd, delta - window_size) == -1)
+    return -1;
+
+  highestwrite += delta - window_size;
+
+  return streaming_pwrite (handle, buf, count, offset);
 }
 
 /* Read data back from the stream. */
 static int
 streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
 {
+  uint64_t windowstart;
+  int r;
+
   if (errorstate) {
     nbdkit_error ("unrecoverable error state");
     errno = EIO;
     return -1;
   }
 
-  nbdkit_error ("client tried to read: the streaming plugin does not currently support this");
-  errorstate = 1;
-  errno = EIO;
-  return -1;
+  /* This just makes the recursive case easier to reason about. */
+  if (count == 0)
+    return 0;
+
+  windowstart = highestwrite - window_size;
+
+  if (offset < windowstart) {
+    nbdkit_error ("client seeked backwards > window size: you must increase the window size");
+    errorstate = 1;
+    errno = EIO;
+    return -1;
+  }
+
+  /* Split reads across highestwrite boundary.  Splitting here means
+   * we do not have to deal with reads across the boundary in the code
+   * below.
+   */
+  if (offset < highestwrite && offset + count > highestwrite) {
+    uint64_t size = highestwrite - offset;
+
+    r = streaming_pread (handle, buf, size, offset);
+    if (r == -1)
+      return -1;
+    buf += size;
+    offset += size;
+    count -= size;
+    return streaming_pread (handle, buf, count, offset);
+  }
+
+  /* Handle a read entirely within the window by simply reading the
+   * window contents.
+   */
+  if (offset < highestwrite) {
+    uint64_t windowoffset = window_size - (highestwrite - offset);
+    memcpy (buf, &window[windowoffset], count);
+    return 0;
+  }
+
+  /* Else any read ahead of the current highest write is returned as
+   * all zeroes.
+   */
+  memset (buf, 0, count);
+  return 0;
+}
+
+/* nbdkit is shutting down - the rest of the window should be written out. */
+static void
+streaming_unload (void)
+{
+  if (fd >= 0) {
+    /* XXX impossible to report an error to the client here */
+    xwrite (fd, window, window_size);
+
+    close (fd);
+  }
+
+  free (window);
+  free (filename);
 }
 
 static struct nbdkit_plugin plugin = {
-- 
2.0.4

>From 0c4ffccc7258dcff94cc40abcf470a3c5ad788c3 Mon Sep 17 00:00:00 2001
From: "Richard W.M. Jones" <rjones redhat com>
Date: Tue, 14 Oct 2014 14:31:08 +0200
Subject: [PATCH 2/2] tests: Enable streaming test.

---
 plugins/streaming/streaming.c | 13 +++++++++++--
 tests/Makefile.am             | 17 +++++------------
 tests/test-streaming.c        | 16 +++++++---------
 3 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/plugins/streaming/streaming.c b/plugins/streaming/streaming.c
index 2d08803..da4de63 100644
--- a/plugins/streaming/streaming.c
+++ b/plugins/streaming/streaming.c
@@ -35,6 +35,8 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
 #include <string.h>
 #include <fcntl.h>
 #include <unistd.h>
@@ -232,6 +234,10 @@ streaming_pwrite (void *handle, const void *buf,
     return -1;
   }
 
+  nbdkit_debug ("pwrite: offset=%" PRIi64 " count=%" PRIu32
+                " highestwrite=%" PRIu64,
+                offset, count, highestwrite);
+
   /* This just makes the recursive case easier to reason about. */
   if (count == 0)
     return 0;
@@ -239,7 +245,8 @@ streaming_pwrite (void *handle, const void *buf,
   windowstart = highestwrite - window_size;
 
   if (offset < windowstart) {
-    nbdkit_error ("client seeked backwards > window size: you must increase the window size");
+    nbdkit_error ("pwrite: client backwards seek > window size: you must increase the window size (highestwrite=%" PRIu64 ", window_size=%" PRIi64 ")",
+                  highestwrite, window_size);
     errorstate = 1;
     errno = EIO;
     return -1;
@@ -298,6 +305,7 @@ streaming_pwrite (void *handle, const void *buf,
     window = new_window;
     /* Make sure the extended window is zeroes to start with. */
     memset (&window[window_size], 0, new_size - window_size);
+    window_size = new_size;
     highestwrite = new_highestwrite;
     /* Copy the buffer to the new window. */
     memcpy (&window[offset - windowstart], buf, count);
@@ -374,7 +382,8 @@ streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
   windowstart = highestwrite - window_size;
 
   if (offset < windowstart) {
-    nbdkit_error ("client seeked backwards > window size: you must increase the window size");
+    nbdkit_error ("pread: client backwards seek > window size: you must increase the window size (highestwrite=%" PRIu64 ", window_size=%" PRIi64 ")",
+                  highestwrite, window_size);
     errorstate = 1;
     errno = EIO;
     return -1;
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a50e26b..cccd45b 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -172,18 +172,11 @@ test_python_LDADD = libtest.la $(LIBGUESTFS_LIBS)
 endif
 
 # streaming plugin test.
+check_PROGRAMS += test-streaming
+TESTS += test-streaming
 
-# This is disabled at the moment because the libguestfs appliance
-# kernel tries to read from the device (eg to read the partition
-# table) and the current streaming plugin cannot handle this.
-# Implementing a sliding window in the plugin would fix this. (XXX)
-EXTRA_DIST += test-streaming.c
-
-#check_PROGRAMS += test-streaming
-#TESTS += test-streaming
-#
-#test_streaming_SOURCES = test-streaming.c test.h
-#test_streaming_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS)
-#test_streaming_LDADD = libtest.la $(LIBGUESTFS_LIBS)
+test_streaming_SOURCES = test-streaming.c test.h
+test_streaming_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS)
+test_streaming_LDADD = libtest.la $(LIBGUESTFS_LIBS)
 
 endif
diff --git a/tests/test-streaming.c b/tests/test-streaming.c
index 1631c19..4610fb9 100644
--- a/tests/test-streaming.c
+++ b/tests/test-streaming.c
@@ -48,7 +48,7 @@
 
 #include "test.h"
 
-static char data[4096];
+static char data[1024];
 
 int
 main (int argc, char *argv[])
@@ -69,6 +69,8 @@ main (int argc, char *argv[])
 
   if (test_start_nbdkit (NBDKIT_PLUGIN ("streaming"),
                          "pipe=streaming.fifo",
+                         "size=128k",
+                         "window=128k",
                          NULL) == -1)
     exit (EXIT_FAILURE);
 
@@ -121,14 +123,10 @@ main (int argc, char *argv[])
     exit (EXIT_FAILURE);
 
   /* Write linearly to the virtual disk. */
-  for (i = 0; i < 10; ++i) {
-    memset (data, i+1, sizeof data);
-
-    /* Note that we deliberately skip forwards, in order to
-     * exercise seeking code in the streaming plugin.
-     */
+  memset (data, 1, sizeof data);
+  for (i = 0; i < 32; ++i) {
     guestfs_pwrite_device (g, "/dev/sda", data, sizeof data,
-                           (2 * i) * sizeof data);
+                           i * sizeof data);
   }
 
   if (guestfs_shutdown (g) == -1)
@@ -148,7 +146,7 @@ main (int argc, char *argv[])
   }
   md5[32] = '\0';
 
-  if (strcmp (md5, "0123456789abcdef0123456789abcdef") != 0) {
+  if (strcmp (md5, "51ae9fa5fb90e9d51c4f1b4260285c99") != 0) {
     fprintf (stderr, "unexpected hash: %s\n", md5);
     exit (EXIT_FAILURE);
   }
-- 
2.0.4


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]