[libvirt] [PATCH 3/3] esx: Add volume upload and download to the storage driver

Matthias Bolte matthias.bolte at googlemail.com
Mon Jul 2 21:44:53 UTC 2012


This requires new flags for the public API functions, because a
VMDK volume can have metadata and content in separate files. This
is not yet supported by the libvirt storage API as it currently
assumes that a single file represents a volume. At least it does
so in case of the volume upload and download functions.

For example, the default VMDK structure created by an ESX server
is a volume.vmdk file (containing metadata in plaintext key/value
form) and a volume-flat.vmdk (containing the actual content of the
volume). The volume.vmdk contains the name of its content file.
The volume.vmdk is the one that represents the volume to the ESX
server and is the one that is specified in datastore paths. The
volume-flat.vmdk is basically an implementation detail here.

There are also one-file VMDK volumes, as created by qemu-img when
targeting the VMDK format. Such VMDK volumes starts with a 'KDMV'
magic and contain metadata and content.

The new METADATA and CONTENT flags allow a caller to tell the ESX
storage driver which part of a volume should be up- or downloaded
in case of a two-file VMDK. In case of a one-file VMDK both or no
flags must be given. In case of non-VMDK volumes such as ISO or
floppy images no flags must be specified.

If the CONTENT flag is given the the ESX driver reads the metadata
file in the datastore to figure out the name of the content file.
This means that for two-file VMDKs the matadata file has to be
uploaded first.

The libcurl based stream driver cannot use a libcurl easy handle
alone because curl_easy_perform would do the whole transfer before
it returns. But there is no place in the stream handling concept
that would allow for such a call to be made. The stream is driven
by esxStream(Send|Recv) which is probably called multiple times to
send/receive the stream in chunks. Therefore, a libcurl multi handle
is used that allows to perform the data transfer in chunks and also
allows to support non-blocking operations. Although, the stream
callbacks for non-blocking operations are not implemented yet.
---
 include/libvirt/libvirt.h.in |   14 +
 src/Makefile.am              |    1 +
 src/esx/esx_storage_driver.c |  268 ++++++++++++++++++
 src/esx/esx_stream.c         |  610 ++++++++++++++++++++++++++++++++++++++++++
 src/esx/esx_stream.h         |   33 +++
 src/esx/esx_util.c           |   82 ++++++
 src/esx/esx_util.h           |    2 +
 src/esx/esx_vi.c             |   29 ++
 src/esx/esx_vi.h             |    2 +
 tools/virsh.c                |   22 ++-
 10 files changed, 1061 insertions(+), 2 deletions(-)
 create mode 100644 src/esx/esx_stream.c
 create mode 100644 src/esx/esx_stream.h

diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in
index 6e8d5dd..d11ed68 100644
--- a/include/libvirt/libvirt.h.in
+++ b/include/libvirt/libvirt.h.in
@@ -2562,11 +2562,25 @@ virStorageVolPtr        virStorageVolCreateXMLFrom      (virStoragePoolPtr pool,
                                                          const char *xmldesc,
                                                          virStorageVolPtr clonevol,
                                                          unsigned int flags);
+
+typedef enum {
+    VIR_STORAGE_VOL_DOWNLOAD_DEFAULT  = 0,      /* default behavior */
+    VIR_STORAGE_VOL_DOWNLOAD_METADATA = 1 << 0, /* download metadata only */
+    VIR_STORAGE_VOL_DOWNLOAD_CONTENT  = 1 << 1, /* download content only */
+} virStorageVolDownloadFlags;
+
 int                     virStorageVolDownload           (virStorageVolPtr vol,
                                                          virStreamPtr stream,
                                                          unsigned long long offset,
                                                          unsigned long long length,
                                                          unsigned int flags);
+
+typedef enum {
+    VIR_STORAGE_VOL_UPLOAD_DEFAULT  = 0,      /* default behavior */
+    VIR_STORAGE_VOL_UPLOAD_METADATA = 1 << 0, /* upload metadata only */
+    VIR_STORAGE_VOL_UPLOAD_CONTENT  = 1 << 1, /* upload content only */
+} virStorageVolUploadFlags;
+
 int                     virStorageVolUpload             (virStorageVolPtr vol,
                                                          virStreamPtr stream,
                                                          unsigned long long offset,
diff --git a/src/Makefile.am b/src/Makefile.am
index 2309984..db8fb91 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -430,6 +430,7 @@ ESX_DRIVER_SOURCES =						\
 		esx/esx_secret_driver.c esx/esx_secret_driver.h		\
 		esx/esx_nwfilter_driver.c esx/esx_nwfilter_driver.h		\
 		esx/esx_util.c esx/esx_util.h			\
+		esx/esx_stream.c esx/esx_stream.h			\
 		esx/esx_vi.c esx/esx_vi.h			\
 		esx/esx_vi_methods.c esx/esx_vi_methods.h	\
 		esx/esx_vi_types.c esx/esx_vi_types.h
diff --git a/src/esx/esx_storage_driver.c b/src/esx/esx_storage_driver.c
index 9b64891..40f9471 100644
--- a/src/esx/esx_storage_driver.c
+++ b/src/esx/esx_storage_driver.c
@@ -38,6 +38,7 @@
 #include "esx_vi.h"
 #include "esx_vi_methods.h"
 #include "esx_util.h"
+#include "esx_stream.h"
 
 #define VIR_FROM_THIS VIR_FROM_ESX
 
@@ -1381,6 +1382,271 @@ esxStorageVolumeCreateXMLFrom(virStoragePoolPtr pool, const char *xmldesc,
 
 
 static int
+esxStorageVolumeDownload(virStorageVolPtr volume, virStreamPtr stream,
+                         unsigned long long offset,
+                         unsigned long long length,
+                         unsigned int flags)
+{
+    int result = -1;
+    esxPrivate *priv = volume->conn->storagePrivateData;
+    virBuffer buffer = VIR_BUFFER_INITIALIZER;
+    char *url = NULL;
+    char *magic = NULL;
+    unsigned long long magic_length = 4;
+    char *metadata = NULL;
+    unsigned long long metadata_length = 0;
+    char *contentName = NULL;
+    char *directoryName = NULL;
+
+    virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_METADATA |
+                  VIR_STORAGE_VOL_DOWNLOAD_CONTENT, -1);
+
+    /* Build URL */
+    virBufferAsprintf(&buffer, "%s://%s:%d/folder/", priv->parsedUri->transport,
+                      volume->conn->uri->server, volume->conn->uri->port);
+    virBufferURIEncodeString(&buffer, volume->name);
+    virBufferAddLit(&buffer, "?dcPath=");
+    virBufferURIEncodeString(&buffer, priv->primary->datacenterPath);
+    virBufferAddLit(&buffer, "&dsName=");
+    virBufferURIEncodeString(&buffer, volume->pool);
+
+    if (virBufferError(&buffer)) {
+        virReportOOMError();
+        goto cleanup;
+    }
+
+    url = virBufferContentAndReset(&buffer);
+
+    if (virFileHasSuffix(volume->name, ".vmdk")) {
+        /* Download VMDK magic to check VMDK type */
+        if (esxVI_CURL_Download(priv->primary->curl, url, &magic, 0,
+                                &magic_length) < 0) {
+            goto cleanup;
+        }
+
+        if (magic_length < 4) {
+            ESX_ERROR(VIR_ERR_INTERNAL_ERROR, "%s", _("Volume is too small"));
+            goto cleanup;
+        }
+
+        if (STREQLEN(magic, "KDMV", 4)) {
+            /* It's a one-file VMDK with metadata and content */
+            if (flags != VIR_STORAGE_VOL_DOWNLOAD_DEFAULT &&
+                flags != (VIR_STORAGE_VOL_DOWNLOAD_METADATA |
+                          VIR_STORAGE_VOL_DOWNLOAD_CONTENT)) {
+                ESX_ERROR(VIR_ERR_INVALID_ARG, "%s",
+                          _("Non or both of metadata and content flag is required for this volume"));
+                goto cleanup;
+            }
+        } else {
+            /* It's a two-file VMDK, this is the metadata file */
+            if (flags != VIR_STORAGE_VOL_DOWNLOAD_METADATA &&
+                flags != VIR_STORAGE_VOL_DOWNLOAD_CONTENT) {
+                ESX_ERROR(VIR_ERR_INVALID_ARG, "%s",
+                          _("Either metadata or content flag is required for this volume"));
+                goto cleanup;
+            }
+
+            if (flags == VIR_STORAGE_VOL_DOWNLOAD_CONTENT) {
+                /* Download full metadata */
+                metadata_length = 0;
+
+                if (esxVI_CURL_Download(priv->primary->curl, url, &metadata, 0,
+                                        &metadata_length) < 0) {
+                    goto cleanup;
+                }
+
+                contentName = esxUtil_ParseVMDKMetadata(metadata, metadata_length);
+
+                if (contentName == NULL) {
+                    goto cleanup;
+                }
+
+                /* Build content URL */
+                virBufferAsprintf(&buffer, "%s://%s:%d/folder/",
+                                  priv->parsedUri->transport,
+                                  volume->conn->uri->server,
+                                  volume->conn->uri->port);
+
+                directoryName = strrchr(volume->name, '/');
+
+                if (directoryName != NULL) {
+                    directoryName = strndup(volume->name,
+                                            directoryName - volume->name + 1);
+
+                    if (directoryName == NULL) {
+                        virReportOOMError();
+                        goto cleanup;
+                    }
+
+                    virBufferURIEncodeString(&buffer, directoryName);
+                }
+
+                virBufferURIEncodeString(&buffer, contentName);
+                virBufferAddLit(&buffer, "?dcPath=");
+                virBufferURIEncodeString(&buffer, priv->primary->datacenterPath);
+                virBufferAddLit(&buffer, "&dsName=");
+                virBufferURIEncodeString(&buffer, volume->pool);
+
+                if (virBufferError(&buffer)) {
+                    virReportOOMError();
+                    goto cleanup;
+                }
+
+                VIR_FREE(url);
+                url = virBufferContentAndReset(&buffer);
+            }
+        }
+    } else {
+        if (flags != VIR_STORAGE_VOL_DOWNLOAD_DEFAULT) {
+            ESX_ERROR(VIR_ERR_INVALID_ARG, "%s",
+                      _("Invalid flag combination for this volume"));
+            goto cleanup;
+        }
+    }
+
+    if (esxStreamOpenDownload(stream, priv, url, offset, length) < 0) {
+        goto cleanup;
+    }
+
+    result = 0;
+
+  cleanup:
+    if (url == NULL) {
+        virBufferFreeAndReset(&buffer);
+    }
+
+    VIR_FREE(url);
+    VIR_FREE(magic);
+    VIR_FREE(metadata);
+    VIR_FREE(contentName);
+    VIR_FREE(directoryName);
+
+    return result;
+}
+
+
+
+static int
+esxStorageVolumeUpload(virStorageVolPtr volume, virStreamPtr stream,
+                       unsigned long long offset,
+                       unsigned long long length,
+                       unsigned int flags)
+{
+    int result = -1;
+    esxPrivate *priv = volume->conn->storagePrivateData;
+    virBuffer buffer = VIR_BUFFER_INITIALIZER;
+    char *url = NULL;
+    char *metadata = NULL;
+    unsigned long long metadata_length = 0;
+    char *contentName = NULL;
+    char *directoryName = NULL;
+
+    virCheckFlags(VIR_STORAGE_VOL_UPLOAD_METADATA |
+                  VIR_STORAGE_VOL_UPLOAD_CONTENT, -1);
+
+    if (offset != 0 || length != 0) {
+        ESX_ERROR(VIR_ERR_INVALID_ARG, "%s",
+                  _("Partial volume upload is not supported"));
+        return -1;
+    }
+
+    /* Build URL */
+    virBufferAsprintf(&buffer, "%s://%s:%d/folder/", priv->parsedUri->transport,
+                      volume->conn->uri->server, volume->conn->uri->port);
+    virBufferURIEncodeString(&buffer, volume->name);
+    virBufferAddLit(&buffer, "?dcPath=");
+    virBufferURIEncodeString(&buffer, priv->primary->datacenterPath);
+    virBufferAddLit(&buffer, "&dsName=");
+    virBufferURIEncodeString(&buffer, volume->pool);
+
+    if (virBufferError(&buffer)) {
+        virReportOOMError();
+        goto cleanup;
+    }
+
+    url = virBufferContentAndReset(&buffer);
+
+    if (virFileHasSuffix(volume->name, ".vmdk")) {
+        if (flags == VIR_STORAGE_VOL_UPLOAD_CONTENT) {
+            /* Download metadata */
+            metadata_length = 0;
+
+            if (esxVI_CURL_Download(priv->primary->curl, url, &metadata, 0,
+                                    &metadata_length) < 0) {
+                goto cleanup;
+            }
+
+            contentName = esxUtil_ParseVMDKMetadata(metadata, metadata_length);
+
+            if (contentName == NULL) {
+                goto cleanup;
+            }
+
+            /* Build content URL */
+            virBufferAsprintf(&buffer, "%s://%s:%d/folder/",
+                              priv->parsedUri->transport,
+                              volume->conn->uri->server,
+                              volume->conn->uri->port);
+
+            directoryName = strrchr(volume->name, '/');
+
+            if (directoryName != NULL) {
+                directoryName = strndup(volume->name,
+                                        directoryName - volume->name + 1);
+
+                if (directoryName == NULL) {
+                    virReportOOMError();
+                    goto cleanup;
+                }
+
+                virBufferURIEncodeString(&buffer, directoryName);
+            }
+
+            virBufferURIEncodeString(&buffer, contentName);
+            virBufferAddLit(&buffer, "?dcPath=");
+            virBufferURIEncodeString(&buffer, priv->primary->datacenterPath);
+            virBufferAddLit(&buffer, "&dsName=");
+            virBufferURIEncodeString(&buffer, volume->pool);
+
+            if (virBufferError(&buffer)) {
+                virReportOOMError();
+                goto cleanup;
+            }
+
+            VIR_FREE(url);
+            url = virBufferContentAndReset(&buffer);
+        }
+    } else {
+        if (flags != VIR_STORAGE_VOL_UPLOAD_DEFAULT) {
+            ESX_ERROR(VIR_ERR_INVALID_ARG, "%s",
+                      _("Invalid flag combination for this volume"));
+            goto cleanup;
+        }
+    }
+
+    if (esxStreamOpenUpload(stream, priv, url) < 0) {
+        goto cleanup;
+    }
+
+    result = 0;
+
+  cleanup:
+    if (url == NULL) {
+        virBufferFreeAndReset(&buffer);
+    }
+
+    VIR_FREE(url);
+    VIR_FREE(metadata);
+    VIR_FREE(contentName);
+    VIR_FREE(directoryName);
+
+    return result;
+}
+
+
+
+static int
 esxStorageVolumeDelete(virStorageVolPtr volume, unsigned int flags)
 {
     int result = -1;
@@ -1668,6 +1934,8 @@ static virStorageDriver esxStorageDriver = {
     .volLookupByPath = esxStorageVolumeLookupByPath, /* 0.8.4 */
     .volCreateXML = esxStorageVolumeCreateXML, /* 0.8.4 */
     .volCreateXMLFrom = esxStorageVolumeCreateXMLFrom, /* 0.8.7 */
+    .volDownload = esxStorageVolumeDownload, /* 0.9.14 */
+    .volUpload = esxStorageVolumeUpload, /* 0.9.14 */
     .volDelete = esxStorageVolumeDelete, /* 0.8.7 */
     .volWipe = esxStorageVolumeWipe, /* 0.8.7 */
     .volGetInfo = esxStorageVolumeGetInfo, /* 0.8.4 */
diff --git a/src/esx/esx_stream.c b/src/esx/esx_stream.c
new file mode 100644
index 0000000..fd84d14
--- /dev/null
+++ b/src/esx/esx_stream.c
@@ -0,0 +1,610 @@
+
+/*
+ * esx_stream.c: libcurl based stream driver
+ *
+ * Copyright (C) 2012 Matthias Bolte <matthias.bolte at googlemail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ */
+
+#include <config.h>
+
+#include "internal.h"
+#include "datatypes.h"
+#include "util.h"
+#include "memory.h"
+#include "logging.h"
+#include "esx_stream.h"
+
+#define VIR_FROM_THIS VIR_FROM_ESX
+
+enum _esxStreamMode {
+    ESX_STREAM_MODE_UPLOAD = 1,
+    ESX_STREAM_MODE_DOWNLOAD = 2
+};
+
+typedef struct _esxStreamPrivate esxStreamPrivate;
+typedef enum _esxStreamMode esxStreamMode;
+
+struct _esxStreamPrivate {
+    esxVI_CURL *curl;
+    int mode;
+
+    /* Backlog of downloaded data that has not been esxStreamRecv'ed yet */
+    char *backlog;
+    size_t backlog_size;
+    size_t backlog_used;
+
+    /* Buffer given to esxStream(Send|Recv) to (read|write) data (from|to) */
+    char *buffer;
+    size_t buffer_size;
+    size_t buffer_used;
+};
+
+static size_t
+esxVI_CURL_ReadStream(char *output, size_t size, size_t nmemb, void *userdata)
+{
+    esxStreamPrivate *priv = userdata;
+    size_t output_size = size * nmemb;
+    size_t output_used = 0;
+
+    if (output_size > priv->buffer_used) {
+        output_used = priv->buffer_used;
+    } else {
+        output_used = output_size;
+    }
+
+    memcpy(output, priv->buffer + priv->buffer_size - priv->buffer_used,
+           output_used);
+
+    priv->buffer_used -= output_used;
+
+    return output_used;
+}
+
+static size_t
+esxVI_CURL_WriteStream(char *input, size_t size, size_t nmemb, void *userdata)
+{
+    esxStreamPrivate *priv = userdata;
+    size_t input_size = size * nmemb;
+    size_t input_used = priv->buffer_size - priv->buffer_used;
+
+    if (input_used > input_size) {
+        input_used = input_size;
+    }
+
+    /* Fill buffer */
+    memcpy(priv->buffer + priv->buffer_used, input, input_used);
+    priv->buffer_used += input_used;
+
+    /* Move rest to backlog */
+    if (input_size > input_used) {
+        size_t input_remaining = input_size - input_used;
+        size_t backlog_remaining = priv->backlog_size - priv->backlog_used;
+
+        if (priv->backlog == NULL) {
+            priv->backlog_size = input_remaining;
+            priv->backlog_used = 0;
+
+            if (VIR_ALLOC_N(priv->backlog, priv->backlog_size) < 0) {
+                virReportOOMError();
+                return 0;
+            }
+        } else if (input_remaining > backlog_remaining) {
+            priv->backlog_size += input_remaining - backlog_remaining;
+
+            if (VIR_REALLOC_N(priv->backlog, priv->backlog_size) < 0) {
+                virReportOOMError();
+                return 0;
+            }
+        }
+
+        memcpy(priv->backlog + priv->backlog_used, input + input_used,
+               input_remaining);
+
+        priv->backlog_used += input_remaining;
+    }
+
+    return input_size;
+}
+
+/* Return -1 on error and 1 when it's done */
+static int
+esxStreamTransferNonBlocking(esxStreamPrivate *priv)
+{
+    int runningHandles = 0;
+    CURLMcode multiErrorCode;
+    CURLcode errorCode;
+
+    /* Perform transfer */
+    do {
+        multiErrorCode = curl_multi_perform(priv->curl->multi->handle,
+                                            &runningHandles);
+    } while (multiErrorCode == CURLM_CALL_MULTI_PERFORM);
+
+    if (multiErrorCode != CURLM_OK) {
+        ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                  _("Could not transfer data: %s (%d)"),
+                  curl_multi_strerror(multiErrorCode), multiErrorCode);
+        return -1;
+    }
+
+    /* Check status */
+    if (runningHandles == 0) {
+        long responseCode = 0;
+        int status = esxVI_MultiCURL_CheckFirstMessage(priv->curl->multi,
+                                                       &responseCode,
+                                                       &errorCode);
+
+        if (status == 0) {
+            return 1;
+        }
+
+        if (status < 0) {
+            ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                      _("Could not complete transfer: %s (%d)"),
+                      curl_easy_strerror(errorCode), errorCode);
+            return -1;
+        }
+
+        if (responseCode != 200 && responseCode != 201) {
+            ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                      _("Unexpected HTTP response code %lu"),
+                      responseCode);
+            return -1;
+        }
+    }
+
+    return 1;
+}
+
+/* Return -1 on error, 0 when it needs to be called again, and 1 when it 's done */
+static int
+esxStreamTransferBlocking(esxStreamPrivate *priv)
+{
+    int runningHandles = 0;
+    CURLMcode multiErrorCode;
+    fd_set fdread;
+    fd_set fdwrite;
+    fd_set fdexcep;
+    int maxfd = 0;
+    long timeout = -1;
+    struct timeval tv_timeout;
+    int rc;
+    CURLcode errorCode;
+
+    /* Select */
+    curl_multi_timeout(priv->curl->multi->handle, &timeout);
+
+    if (timeout < 0) {
+        timeout = 1000; /* default to 1 sec timeout */
+    }
+
+    tv_timeout.tv_sec = 0;
+    tv_timeout.tv_usec = timeout * 1000;
+
+    while (tv_timeout.tv_usec >= 1000000) {
+        tv_timeout.tv_sec += 1;
+        tv_timeout.tv_usec -= 1000000;
+    }
+
+    do {
+        rc = 0;
+
+        FD_ZERO(&fdread);
+        FD_ZERO(&fdwrite);
+        FD_ZERO(&fdexcep);
+
+        curl_multi_fdset(priv->curl->multi->handle,
+                         &fdread, &fdwrite, &fdexcep, &maxfd);
+
+        if (maxfd >= 0) {
+            rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &tv_timeout);
+        }
+    } while (rc < 0 && errno == EINTR);
+
+    if (rc < 0) {
+        virReportSystemError(errno, "%s",
+                             _("Could not block on transfer"));
+        return -1;
+    }
+
+    /* Perform transfer */
+    do {
+        multiErrorCode = curl_multi_perform(priv->curl->multi->handle,
+                                            &runningHandles);
+    } while (multiErrorCode == CURLM_CALL_MULTI_PERFORM);
+
+    if (multiErrorCode != CURLM_OK) {
+        ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                  _("Could not transfer data: %s (%d)"),
+                  curl_multi_strerror(multiErrorCode), multiErrorCode);
+        return -1;
+    }
+
+    /* Check status */
+    if (runningHandles == 0) {
+        long responseCode = 0;
+        int status = esxVI_MultiCURL_CheckFirstMessage(priv->curl->multi,
+                                                       &responseCode,
+                                                       &errorCode);
+
+        if (status == 0) {
+            return 1;
+        }
+
+        if (status < 0) {
+            ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                      _("Could not complete transfer: %s (%d)"),
+                      curl_easy_strerror(errorCode), errorCode);
+            return -1;
+        }
+
+        if (responseCode != 200 && responseCode != 206) {
+            ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                      _("Unexpected HTTP response code %lu"),
+                      responseCode);
+            return -1;
+        }
+
+        return 1;
+    }
+
+    return 0;
+}
+
+static int
+esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes)
+{
+    int result = -1;
+    esxStreamPrivate *priv = stream->privateData;
+    int runningHandles = 0;
+    CURLMcode multiErrorCode;
+
+    if (nbytes < 1) {
+        return 0;
+    }
+
+    if (priv == NULL) {
+        ESX_ERROR(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
+        return -1;
+    }
+
+    if (priv->mode != ESX_STREAM_MODE_UPLOAD) {
+        ESX_ERROR(VIR_ERR_INVALID_ARG, "%s", _("Not an upload stream"));
+        return -1;
+    }
+
+    virMutexLock(&priv->curl->lock);
+
+    priv->buffer = (char *)data;
+    priv->buffer_size = nbytes;
+    priv->buffer_used = nbytes;
+
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        if (esxStreamTransferNonBlocking(priv) < 0) {
+            goto cleanup;
+        }
+
+        if (priv->buffer_used < priv->buffer_size) {
+            result = priv->buffer_size - priv->buffer_used;
+        } else {
+            result = -2;
+        }
+    } else /* blocking */ {
+        do {
+            multiErrorCode = curl_multi_perform(priv->curl->multi->handle,
+                                                &runningHandles);
+        } while (multiErrorCode == CURLM_CALL_MULTI_PERFORM);
+
+        if (multiErrorCode != CURLM_OK) {
+            ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                      _("Could not transfer data: %s (%d)"),
+                      curl_multi_strerror(multiErrorCode), multiErrorCode);
+            goto cleanup;
+        }
+
+        do {
+            int status = esxStreamTransferBlocking(priv);
+
+            if (status < 0) {
+                goto cleanup;
+            }
+
+            if (status > 0) {
+                break;
+            }
+        } while (priv->buffer_used > 0);
+
+        result = priv->buffer_size - priv->buffer_used;
+    }
+
+  cleanup:
+    virMutexUnlock(&priv->curl->lock);
+
+    return result;
+}
+
+static int
+esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes)
+{
+    int result = -1;
+    esxStreamPrivate *priv = stream->privateData;
+    int runningHandles = 0;
+    CURLMcode multiErrorCode;
+
+    if (nbytes < 1) {
+        return 0;
+    }
+
+    if (priv == NULL) {
+        ESX_ERROR(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
+        return -1;
+    }
+
+    if (priv->mode != ESX_STREAM_MODE_DOWNLOAD) {
+        ESX_ERROR(VIR_ERR_INVALID_ARG, "%s", _("Not a download stream"));
+        return -1;
+    }
+
+    virMutexLock(&priv->curl->lock);
+
+    priv->buffer = data;
+    priv->buffer_size = nbytes;
+    priv->buffer_used = 0;
+
+    if (priv->backlog_used > 0) {
+        if (priv->buffer_size > priv->backlog_used) {
+            priv->buffer_used = priv->backlog_used;
+        } else {
+            priv->buffer_used = priv->buffer_size;
+        }
+
+        memcpy(priv->buffer, priv->backlog, priv->buffer_used);
+        memmove(priv->backlog, priv->backlog + priv->buffer_used,
+                priv->backlog_used - priv->buffer_used);
+
+        priv->backlog_used -= priv->buffer_used;
+
+        result = priv->buffer_used;
+    } else if (stream->flags & VIR_STREAM_NONBLOCK) {
+        if (esxStreamTransferNonBlocking(priv) < 0) {
+            goto cleanup;
+        }
+
+        if (priv->buffer_used > 0) {
+            result = priv->buffer_used;
+        } else {
+            result = -2;
+        }
+    } else /* blocking */ {
+        do {
+            multiErrorCode = curl_multi_perform(priv->curl->multi->handle,
+                                                &runningHandles);
+        } while (multiErrorCode == CURLM_CALL_MULTI_PERFORM);
+
+        if (multiErrorCode != CURLM_OK) {
+            ESX_ERROR(VIR_ERR_INTERNAL_ERROR,
+                      _("curl_multi_perform() returned an error: %s (%d)"),
+                      curl_multi_strerror(multiErrorCode), multiErrorCode);
+            goto cleanup;
+        }
+
+        do {
+            int status = esxStreamTransferBlocking(priv);
+
+            if (status < 0) {
+                goto cleanup;
+            }
+
+            if (status > 0) {
+                break;
+            }
+        } while (priv->buffer_used < priv->buffer_size);
+
+        result = priv->buffer_used;
+    }
+
+  cleanup:
+    virMutexUnlock(&priv->curl->lock);
+
+    return result;
+}
+
+static void
+esxFreeStreamPrivate(esxStreamPrivate **priv)
+{
+    if (priv == NULL || *priv == NULL) {
+        return;
+    }
+
+    esxVI_CURL_Free(&(*priv)->curl);
+    VIR_FREE((*priv)->backlog);
+    VIR_FREE(*priv);
+}
+
+static int
+esxStreamClose(virStreamPtr stream, bool finish)
+{
+    int result = 0;
+    esxStreamPrivate *priv = stream->privateData;
+
+    if (priv == NULL) {
+        return 0;
+    }
+
+    virMutexLock(&priv->curl->lock);
+
+    if (finish && priv->backlog_used > 0) {
+        ESX_ERROR(VIR_ERR_INTERNAL_ERROR, "%s",
+                  _("Stream has untransferred data left"));
+        result = -1;
+    }
+
+    stream->privateData = NULL;
+
+    virMutexUnlock(&priv->curl->lock);
+
+    esxFreeStreamPrivate(&priv);
+
+    return result;
+}
+
+static int
+esxStreamFinish(virStreamPtr stream)
+{
+    return esxStreamClose(stream, true);
+}
+
+static int
+esxStreamAbort(virStreamPtr stream)
+{
+    return esxStreamClose(stream, false);
+}
+
+virStreamDriver esxStreamDriver = {
+    .streamSend = esxStreamSend,
+    .streamRecv = esxStreamRecv,
+    /* FIXME: streamAddCallback missing */
+    /* FIXME: streamUpdateCallback missing */
+    /* FIXME: streamRemoveCallback missing */
+    .streamFinish = esxStreamFinish,
+    .streamAbort = esxStreamAbort,
+};
+
+static int
+esxStreamOpen(virStreamPtr stream, esxPrivate *priv, const char *url,
+              unsigned long long offset, unsigned long long length)
+{
+    int result = -1;
+    esxStreamPrivate *streamPriv;
+    char *range = NULL;
+    char *userpwd = NULL;
+    esxVI_MultiCURL *multi = NULL;
+
+    /* Create stream private data */
+    if (VIR_ALLOC(streamPriv) < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    if (length > 0) {
+        if (virAsprintf(&range, "%llu-%llu", offset, offset + length - 1) < 0) {
+            virReportOOMError();
+            goto cleanup;
+        }
+    } else if (offset > 0) {
+        if (virAsprintf(&range, "%llu-", offset) < 0) {
+            virReportOOMError();
+            goto cleanup;
+        }
+    }
+
+    /*
+     * Initialize CURL handle. We cannot use an easy handle alone here because
+     * curl_easy_perform whould do the whole transfer before it returns. But
+     * there is no place in the stream handling that would allow for such a
+     * call. The stream is driven by esxStream(Send|Recv) that is probably
+     * called multiple times to send/receive the stream in chunks. Therefore,
+     * we use the multi handle here that allows to perform the (up|down)load in
+     * chunks and also allows to support non-blocking operations.
+     */
+    if (esxVI_CURL_Alloc(&streamPriv->curl) < 0 ||
+        esxVI_CURL_Connect(streamPriv->curl, priv->parsedUri) < 0 ||
+        esxVI_MultiCURL_Alloc(&multi) < 0 ||
+        esxVI_MultiCURL_Add(multi, streamPriv->curl) < 0) {
+        goto cleanup;
+    }
+
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_URL, url);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_RANGE, range);
+
+#if LIBCURL_VERSION_NUM >= 0x071301 /* 7.19.1 */
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERNAME,
+                     priv->primary->username);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_PASSWORD,
+                     priv->primary->password);
+#else
+    if (virAsprintf(&userpwd, "%s:%s", priv->primary->username,
+                    priv->primary->password) < 0) {
+        virReportOOMError();
+        goto cleanup;
+    }
+
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERPWD, userpwd);
+#endif
+
+    stream->driver = &esxStreamDriver;
+    stream->privateData = streamPriv;
+
+    result = 0;
+
+  cleanup:
+    if (result < 0) {
+        if (streamPriv->curl != NULL && multi != streamPriv->curl->multi) {
+            esxVI_MultiCURL_Free(&multi);
+        }
+
+        esxFreeStreamPrivate(&streamPriv);
+    }
+
+    VIR_FREE(range);
+    VIR_FREE(userpwd);
+
+    return result;
+}
+
+int
+esxStreamOpenUpload(virStreamPtr stream, esxPrivate *priv, const char *url)
+{
+    esxStreamPrivate *streamPriv;
+
+    if (esxStreamOpen(stream, priv, url, 0, 0) < 0) {
+        return -1;
+    }
+
+    streamPriv = stream->privateData;
+    streamPriv->mode = ESX_STREAM_MODE_UPLOAD;
+
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 1);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READDATA, streamPriv);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READFUNCTION,
+                     esxVI_CURL_ReadStream);
+
+    return 0;
+}
+
+int
+esxStreamOpenDownload(virStreamPtr stream, esxPrivate *priv, const char *url,
+                      unsigned long long offset, unsigned long long length)
+{
+    esxStreamPrivate *streamPriv;
+
+    if (esxStreamOpen(stream, priv, url, offset, length) < 0) {
+        return -1;
+    }
+
+    streamPriv = stream->privateData;
+    streamPriv->mode = ESX_STREAM_MODE_DOWNLOAD;
+
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 0);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_HTTPGET, 1);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEDATA, streamPriv);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEFUNCTION,
+                     esxVI_CURL_WriteStream);
+
+    return 0;
+}
diff --git a/src/esx/esx_stream.h b/src/esx/esx_stream.h
new file mode 100644
index 0000000..ef4e2a8
--- /dev/null
+++ b/src/esx/esx_stream.h
@@ -0,0 +1,33 @@
+
+/*
+ * esx_stream.h: libcurl based stream driver
+ *
+ * Copyright (C) 2012 Matthias Bolte <matthias.bolte at googlemail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ */
+
+#ifndef __ESX_STREAM_H__
+# define __ESX_STREAM_H__
+
+# include "internal.h"
+# include "esx_private.h"
+
+int esxStreamOpenUpload(virStreamPtr stream, esxPrivate *priv, const char *url);
+int esxStreamOpenDownload(virStreamPtr stream, esxPrivate *priv, const char *url,
+                          unsigned long long offset, unsigned long long length);
+
+#endif /* __ESX_STREAM_H__ */
diff --git a/src/esx/esx_util.c b/src/esx/esx_util.c
index 478f60b..f4c37c5 100644
--- a/src/esx/esx_util.c
+++ b/src/esx/esx_util.c
@@ -331,6 +331,88 @@ esxUtil_ParseDatastorePath(const char *datastorePath, char **datastoreName,
 
 
 
+char *
+esxUtil_ParseVMDKMetadata(const char *metadata, /* might not be NUL-terminated */
+                          unsigned long long metadata_length)
+{
+    /* Parse 'test-flat.vmdk' from 'RW 4194304 VMFS "test-flat.vmdk"' */
+    const char *p = metadata;
+    const char *end = metadata + metadata_length;
+    const char *s = NULL;
+    char *contentName = NULL;
+
+    while (p < end) {
+        /* Skip leading whitespace */
+        while (p < end && (*p == ' ' || *p == '\t')) {
+            ++p;
+        }
+
+        if (p + 3 < end && STRPREFIX(p, "RW ")) {
+            /* Skip until first '"' */
+            while (p < end && *p != '"') {
+                if (*p == '\n') {
+                    goto nextline;
+                }
+
+                ++p;
+            }
+
+            if (p < end) {
+                ++p;
+            }
+
+            s = p;
+
+            /* Read until second '"' */
+            while (p < end && *p != '"') {
+                if (*p == '\n') {
+                    goto nextline;
+                }
+
+                ++p;
+            }
+
+            if (p < end && *p == '"') {
+                contentName = strndup(s, p - s);
+
+                if (contentName == NULL) {
+                    virReportOOMError();
+                    return NULL;
+                }
+
+                break;
+            }
+        }
+
+        /* Skip rest of line */
+        while (p < end && *p != '\n') {
+            ++p;
+        }
+
+  nextline:
+        while (p < end && *p == '\n') {
+            ++p;
+        }
+    }
+
+    if (contentName == NULL) {
+        ESX_ERROR(VIR_ERR_INTERNAL_ERROR, "%s",
+                  _("Metadata does not contain a content file name"));
+        return NULL;
+    }
+
+    if (strchr(contentName, '/') != NULL)  {
+        VIR_FREE(contentName);
+        ESX_ERROR(VIR_ERR_INTERNAL_ERROR, "%s",
+                  _("Metadata contains invalid content file name"));
+        return NULL;
+    }
+
+    return contentName;
+}
+
+
+
 int
 esxUtil_ResolveHostname(const char *hostname,
                         char *ipAddress, size_t ipAddress_length)
diff --git a/src/esx/esx_util.h b/src/esx/esx_util.h
index a69b3f4..8cd0e4b 100644
--- a/src/esx/esx_util.h
+++ b/src/esx/esx_util.h
@@ -49,6 +49,8 @@ int esxUtil_ParseVirtualMachineIDString(const char *id_string, int *id);
 int esxUtil_ParseDatastorePath(const char *datastorePath, char **datastoreName,
                                char **directoryName, char **directoryAndFileName);
 
+char *esxUtil_ParseVMDKMetadata(const char *metadata, unsigned long long metadata_length);
+
 int esxUtil_ResolveHostname(const char *hostname,
                             char *ipAddress, size_t ipAddress_length);
 
diff --git a/src/esx/esx_vi.c b/src/esx/esx_vi.c
index 3f8d745..ebb3ed9 100644
--- a/src/esx/esx_vi.c
+++ b/src/esx/esx_vi.c
@@ -724,6 +724,35 @@ esxVI_MultiCURL_Remove(esxVI_MultiCURL *multi, esxVI_CURL *curl)
     return 0;
 }
 
+/* Returns -1 on error, 0 if there is no message, 1 if there is a message */
+int
+esxVI_MultiCURL_CheckFirstMessage(esxVI_MultiCURL *multi, long *responseCode,
+                                  CURLcode *errorCode)
+{
+    int messagesInQueue;
+    CURLMsg* msg = curl_multi_info_read(multi->handle, &messagesInQueue);
+
+    *responseCode = 0;
+
+    if (msg == NULL) {
+        return 0;
+    }
+
+    if (msg->msg != CURLMSG_DONE) {
+        return 0;
+    }
+
+    *errorCode = msg->data.result;
+
+    if (*errorCode != CURLE_OK) {
+        return -1;
+    }
+
+    curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, responseCode);
+
+    return 1;
+}
+
 
 
 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
diff --git a/src/esx/esx_vi.h b/src/esx/esx_vi.h
index 49b7ca2..c464725 100644
--- a/src/esx/esx_vi.h
+++ b/src/esx/esx_vi.h
@@ -203,6 +203,8 @@ int esxVI_MultiCURL_Alloc(esxVI_MultiCURL **multi);
 void esxVI_MultiCURL_Free(esxVI_MultiCURL **multi);
 int esxVI_MultiCURL_Add(esxVI_MultiCURL *multi, esxVI_CURL *curl);
 int esxVI_MultiCURL_Remove(esxVI_MultiCURL *multi, esxVI_CURL *curl);
+int esxVI_MultiCURL_CheckFirstMessage(esxVI_MultiCURL *multi, long *responseCode,
+                                      CURLcode *errorCode);
 
 
 
diff --git a/tools/virsh.c b/tools/virsh.c
index 53d1825..1fca8c8 100644
--- a/tools/virsh.c
+++ b/tools/virsh.c
@@ -12044,6 +12044,8 @@ static const vshCmdOptDef opts_vol_upload[] = {
     {"pool", VSH_OT_STRING, 0, N_("pool name or uuid")},
     {"offset", VSH_OT_INT, 0, N_("volume offset to upload to") },
     {"length", VSH_OT_INT, 0, N_("amount of data to upload") },
+    {"metadata", VSH_OT_BOOL, 0, N_("upload metadata only") },
+    {"content", VSH_OT_BOOL, 0, N_("upload content only") },
     {NULL, 0, 0, NULL}
 };
 
@@ -12066,6 +12068,7 @@ cmdVolUpload (vshControl *ctl, const vshCmd *cmd)
     virStreamPtr st = NULL;
     const char *name = NULL;
     unsigned long long offset = 0, length = 0;
+    unsigned int flags = 0;
 
     if (!vshConnectionUsability(ctl, ctl->conn))
         goto cleanup;
@@ -12080,6 +12083,12 @@ cmdVolUpload (vshControl *ctl, const vshCmd *cmd)
         return false;
     }
 
+    if (vshCommandOptBool(cmd, "metadata"))
+        flags |= VIR_STORAGE_VOL_UPLOAD_METADATA;
+
+    if (vshCommandOptBool(cmd, "content"))
+        flags |= VIR_STORAGE_VOL_UPLOAD_CONTENT;
+
     if (!(vol = vshCommandOptVol(ctl, cmd, "vol", "pool", &name))) {
         return false;
     }
@@ -12095,7 +12104,7 @@ cmdVolUpload (vshControl *ctl, const vshCmd *cmd)
     }
 
     st = virStreamNew(ctl->conn, 0);
-    if (virStorageVolUpload(vol, st, offset, length, 0) < 0) {
+    if (virStorageVolUpload(vol, st, offset, length, flags) < 0) {
         vshError(ctl, _("cannot upload to volume %s"), name);
         goto cleanup;
     }
@@ -12144,6 +12153,8 @@ static const vshCmdOptDef opts_vol_download[] = {
     {"pool", VSH_OT_STRING, 0, N_("pool name or uuid")},
     {"offset", VSH_OT_INT, 0, N_("volume offset to download from") },
     {"length", VSH_OT_INT, 0, N_("amount of data to download") },
+    {"metadata", VSH_OT_BOOL, 0, N_("download metadata only") },
+    {"content", VSH_OT_BOOL, 0, N_("download content only") },
     {NULL, 0, 0, NULL}
 };
 
@@ -12157,6 +12168,7 @@ cmdVolDownload (vshControl *ctl, const vshCmd *cmd)
     virStreamPtr st = NULL;
     const char *name = NULL;
     unsigned long long offset = 0, length = 0;
+    unsigned int flags = 0;
     bool created = false;
 
     if (!vshConnectionUsability(ctl, ctl->conn))
@@ -12172,6 +12184,12 @@ cmdVolDownload (vshControl *ctl, const vshCmd *cmd)
         return false;
     }
 
+    if (vshCommandOptBool(cmd, "metadata"))
+        flags |= VIR_STORAGE_VOL_DOWNLOAD_METADATA;
+
+    if (vshCommandOptBool(cmd, "content"))
+        flags |= VIR_STORAGE_VOL_DOWNLOAD_CONTENT;
+
     if (!(vol = vshCommandOptVol(ctl, cmd, "vol", "pool", &name)))
         return false;
 
@@ -12191,7 +12209,7 @@ cmdVolDownload (vshControl *ctl, const vshCmd *cmd)
     }
 
     st = virStreamNew(ctl->conn, 0);
-    if (virStorageVolDownload(vol, st, offset, length, 0) < 0) {
+    if (virStorageVolDownload(vol, st, offset, length, flags) < 0) {
         vshError(ctl, _("cannot download from volume %s"), name);
         goto cleanup;
     }
-- 
1.7.4.1




More information about the libvir-list mailing list