[libvirt] [PATCH libvirt-glib 1/5] Add support for writing to streams

Daniel P. Berrange berrange at redhat.com
Tue Nov 22 12:39:28 UTC 2011


From: "Daniel P. Berrange" <berrange at redhat.com>

---
 libvirt-gobject/Makefile.am                     |    8 +-
 libvirt-gobject/libvirt-gobject-output-stream.c |  240 +++++++++++++++++++++++
 libvirt-gobject/libvirt-gobject-output-stream.h |   68 +++++++
 libvirt-gobject/libvirt-gobject-stream.c        |  115 +++++++++++-
 libvirt-gobject/libvirt-gobject-stream.h        |   27 ++-
 5 files changed, 447 insertions(+), 11 deletions(-)
 create mode 100644 libvirt-gobject/libvirt-gobject-output-stream.c
 create mode 100644 libvirt-gobject/libvirt-gobject-output-stream.h

diff --git a/libvirt-gobject/Makefile.am b/libvirt-gobject/Makefile.am
index 0eef9c8..ec7b454 100644
--- a/libvirt-gobject/Makefile.am
+++ b/libvirt-gobject/Makefile.am
@@ -45,8 +45,7 @@ GOBJECT_GENERATED_FILES = \
 
 libvirt_gobject_1_0_ladir = $(includedir)/libvirt-gobject-1.0/libvirt-gobject
 libvirt_gobject_1_0_la_HEADERS = \
-			$(GOBJECT_HEADER_FILES) \
-			libvirt-gobject-input-stream.h
+			$(GOBJECT_HEADER_FILES)
 nodist_libvirt_gobject_1_0_la_HEADERS = \
 			libvirt-gobject-enums.h
 libvirt_gobject_1_0_la_SOURCES = \
@@ -54,7 +53,10 @@ libvirt_gobject_1_0_la_SOURCES = \
 			$(GOBJECT_SOURCE_FILES) \
 			libvirt-gobject-domain-device-private.h \
 			libvirt-gobject-compat.h \
-			libvirt-gobject-input-stream.c
+			libvirt-gobject-input-stream.h \
+			libvirt-gobject-input-stream.c \
+			libvirt-gobject-output-stream.h \
+			libvirt-gobject-output-stream.c
 nodist_libvirt_gobject_1_0_la_SOURCES = \
 			$(GOBJECT_GENERATED_FILES)
 libvirt_gobject_1_0_la_CFLAGS = \
diff --git a/libvirt-gobject/libvirt-gobject-output-stream.c b/libvirt-gobject/libvirt-gobject-output-stream.c
new file mode 100644
index 0000000..30ee519
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-output-stream.c
@@ -0,0 +1,240 @@
+/*
+ * libvirt-gobject-output-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Authors: Daniel P. Berrange <berrange at redhat.com>
+ *          Marc-André Lureau <marcandre.lureau at redhat.com>
+ */
+
+#include <config.h>
+
+#include <libvirt/virterror.h>
+#include <string.h>
+
+#include "libvirt-glib/libvirt-glib.h"
+#include "libvirt-gobject/libvirt-gobject.h"
+#include "libvirt-gobject-output-stream.h"
+
+extern gboolean debugFlag;
+
+#define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); } while (0)
+
+#define gvir_output_stream_get_type _gvir_output_stream_get_type
+G_DEFINE_TYPE(GVirOutputStream, gvir_output_stream, G_TYPE_OUTPUT_STREAM);
+
+enum
+{
+    PROP_0,
+    PROP_STREAM
+};
+
+struct _GVirOutputStreamPrivate
+{
+    GVirStream *stream;
+
+    /* pending operation metadata */
+    GSimpleAsyncResult *result;
+    GCancellable *cancellable;
+    const void * buffer;
+    gsize count;
+};
+
+static void gvir_output_stream_get_property(GObject    *object,
+                                            guint       prop_id,
+                                            GValue     *value,
+                                            GParamSpec *pspec)
+{
+    GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object);
+
+    switch (prop_id) {
+    case PROP_STREAM:
+        g_value_set_object(value, stream->priv->stream);
+        break;
+
+    default:
+        G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    }
+}
+
+static void gvir_output_stream_set_property(GObject      *object,
+                                            guint         prop_id,
+                                            const GValue *value,
+                                            GParamSpec   *pspec)
+{
+    GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object);
+
+    switch (prop_id) {
+    case PROP_STREAM:
+        stream->priv->stream = g_value_get_object(value);
+        break;
+
+    default:
+        G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    }
+}
+
+static void gvir_output_stream_finalize(GObject *object)
+{
+    GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object);
+
+    DEBUG("Finalize output stream GVirStream=%p", stream->priv->stream);
+    stream->priv->stream = NULL; // unowned
+
+    if (G_OBJECT_CLASS(gvir_output_stream_parent_class)->finalize)
+        (*G_OBJECT_CLASS(gvir_output_stream_parent_class)->finalize)(object);
+}
+
+static void
+gvir_output_stream_write_ready(virStreamPtr st G_GNUC_UNUSED,
+                               int events,
+                               void *opaque)
+{
+    GVirOutputStream *stream = GVIR_OUTPUT_STREAM(opaque);
+    GVirOutputStreamPrivate *priv = stream->priv;
+    GSimpleAsyncResult *simple;
+    GError *error = NULL;
+    gssize result;
+
+    g_return_if_fail(events & VIR_STREAM_EVENT_WRITABLE);
+
+    result  = gvir_stream_send(priv->stream, priv->buffer, priv->count,
+                               priv->cancellable, &error);
+
+    if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+        g_warn_if_reached();
+        return;
+    }
+
+    simple = stream->priv->result;
+    stream->priv->result = NULL;
+
+    if (result >= 0)
+        g_simple_async_result_set_op_res_gssize(simple, result);
+
+    if (error)
+        g_simple_async_result_take_error(simple, error);
+
+    if (priv->cancellable) {
+        g_object_unref(stream->priv->cancellable);
+        priv->cancellable = NULL;
+    }
+
+    g_simple_async_result_complete(simple);
+    g_object_unref(simple);
+
+    return;
+}
+
+static void gvir_output_stream_write_async(GOutputStream *stream,
+                                           const void *buffer,
+                                           gsize count,
+                                           int io_priority G_GNUC_UNUSED,
+                                           GCancellable *cancellable,
+                                           GAsyncReadyCallback callback,
+                                           gpointer user_data)
+{
+    GVirOutputStream *output_stream = GVIR_OUTPUT_STREAM(stream);
+    virStreamPtr handle;
+
+    g_return_if_fail(GVIR_IS_OUTPUT_STREAM(stream));
+    g_return_if_fail(output_stream->priv->result == NULL);
+
+    g_object_get(output_stream->priv->stream, "handle", &handle, NULL);
+
+    if (virStreamEventAddCallback(handle, VIR_STREAM_EVENT_WRITABLE,
+                                  gvir_output_stream_write_ready, stream, NULL) < 0) {
+        g_simple_async_report_error_in_idle(G_OBJECT(stream),
+                                            callback,
+                                            user_data,
+                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+                                            "Couldn't add event callback %s",
+                                            G_STRFUNC);
+        goto end;
+    }
+
+    output_stream->priv->result =
+        g_simple_async_result_new(G_OBJECT(stream), callback, user_data,
+                                  gvir_output_stream_write_async);
+    if (cancellable)
+        g_object_ref(cancellable);
+    output_stream->priv->cancellable = cancellable;
+    output_stream->priv->buffer = buffer;
+    output_stream->priv->count = count;
+
+end:
+    virStreamFree(handle);
+}
+
+
+static gssize gvir_output_stream_write_finish(GOutputStream  *stream,
+                                              GAsyncResult  *result,
+                                              GError **error G_GNUC_UNUSED)
+{
+    GVirOutputStream *output_stream = GVIR_OUTPUT_STREAM(stream);
+    GSimpleAsyncResult *simple;
+    virStreamPtr handle;
+    gssize count;
+
+    g_return_val_if_fail(GVIR_IS_OUTPUT_STREAM(stream), -1);
+    g_object_get(output_stream->priv->stream, "handle", &handle, NULL);
+
+    simple = G_SIMPLE_ASYNC_RESULT(result);
+
+    g_warn_if_fail(g_simple_async_result_get_source_tag(simple) == gvir_output_stream_write_async);
+
+    count = g_simple_async_result_get_op_res_gssize(simple);
+
+    virStreamEventRemoveCallback(handle);
+    virStreamFree(handle);
+
+    return count;
+}
+
+
+static void gvir_output_stream_class_init(GVirOutputStreamClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+    GOutputStreamClass *goutputstream_class = G_OUTPUT_STREAM_CLASS(klass);
+
+    g_type_class_add_private(klass, sizeof(GVirOutputStreamPrivate));
+
+    gobject_class->finalize = gvir_output_stream_finalize;
+    gobject_class->get_property = gvir_output_stream_get_property;
+    gobject_class->set_property = gvir_output_stream_set_property;
+
+    goutputstream_class->write_fn = NULL;
+    goutputstream_class->write_async = gvir_output_stream_write_async;
+    goutputstream_class->write_finish = gvir_output_stream_write_finish;
+
+    g_object_class_install_property(gobject_class, PROP_STREAM,
+                                    g_param_spec_object("stream",
+                                                        "stream",
+                                                        "GVirStream",
+                                                        GVIR_TYPE_STREAM, G_PARAM_CONSTRUCT_ONLY |
+                                                        G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static void gvir_output_stream_init(GVirOutputStream *stream)
+{
+    stream->priv = G_TYPE_INSTANCE_GET_PRIVATE(stream, GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamPrivate);
+}
+
+GVirOutputStream* _gvir_output_stream_new(GVirStream *stream)
+{
+    return GVIR_OUTPUT_STREAM(g_object_new(GVIR_TYPE_OUTPUT_STREAM, "stream", stream, NULL));
+}
diff --git a/libvirt-gobject/libvirt-gobject-output-stream.h b/libvirt-gobject/libvirt-gobject-output-stream.h
new file mode 100644
index 0000000..0ca0053
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-output-stream.h
@@ -0,0 +1,68 @@
+/*
+ * libvirt-gobject-output-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Authors: Daniel P. Berrange <berrange at redhat.com>
+ *          Marc-André Lureau <marcandre.lureau at redhat.com>
+ */
+
+#if !defined(__LIBVIRT_GOBJECT_H__) && !defined(LIBVIRT_GOBJECT_BUILD)
+#error "Only <libvirt-gobject/libvirt-gobject.h> can be included directly."
+#endif
+
+#ifndef __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__
+#define __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__
+
+#include <gio/gio.h>
+#include "libvirt-gobject-stream.h"
+
+G_BEGIN_DECLS
+
+#define GVIR_TYPE_OUTPUT_STREAM                          (_gvir_output_stream_get_type ())
+#define GVIR_OUTPUT_STREAM(inst)                         (G_TYPE_CHECK_INSTANCE_CAST ((inst), \
+                                                         GVIR_TYPE_OUTPUT_STREAM, GVirOutputStream))
+#define GVIR_OUTPUT_STREAM_CLASS(class)                  (G_TYPE_CHECK_CLASS_CAST ((class), \
+                                                         GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamClass))
+#define GVIR_IS_OUTPUT_STREAM(inst)                      (G_TYPE_CHECK_INSTANCE_TYPE ((inst), \
+                                                         GVIR_TYPE_OUTPUT_STREAM))
+#define GVIR_IS_OUTPUT_STREAM_CLASS(class)               (G_TYPE_CHECK_CLASS_TYPE ((class), \
+                                                         GVIR_TYPE_OUTPUT_STREAM))
+#define GVIR_OUTPUT_STREAM_GET_CLASS(inst)               (G_TYPE_INSTANCE_GET_CLASS ((inst), \
+                                                         GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamClass))
+
+typedef struct _GVirOutputStreamPrivate                   GVirOutputStreamPrivate;
+typedef struct _GVirOutputStreamClass                     GVirOutputStreamClass;
+typedef struct _GVirOutputStream                          GVirOutputStream;
+
+struct _GVirOutputStreamClass
+{
+    GOutputStreamClass parent_class;
+};
+
+struct _GVirOutputStream
+{
+    GOutputStream parent_instance;
+    GVirOutputStreamPrivate *priv;
+};
+
+GType                _gvir_output_stream_get_type                 (void) G_GNUC_CONST;
+GVirOutputStream *   _gvir_output_stream_new                      (GVirStream *stream);
+
+G_END_DECLS
+
+#endif /* __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__ */
diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c
index 30673aa..0d1c2d1 100644
--- a/libvirt-gobject/libvirt-gobject-stream.c
+++ b/libvirt-gobject/libvirt-gobject-stream.c
@@ -32,6 +32,7 @@
 #include "libvirt-gobject-compat.h"
 
 #include "libvirt-gobject/libvirt-gobject-input-stream.h"
+#include "libvirt-gobject/libvirt-gobject-output-stream.h"
 
 extern gboolean debugFlag;
 
@@ -44,7 +45,7 @@ struct _GVirStreamPrivate
 {
     virStreamPtr   handle;
     GInputStream  *input_stream;
-    gboolean       in_dispose;
+    GOutputStream  *output_stream;
 };
 
 G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
@@ -77,6 +78,17 @@ static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream)
 }
 
 
+static GOutputStream* gvir_stream_get_output_stream(GIOStream *io_stream)
+{
+    GVirStream *self = GVIR_STREAM(io_stream);
+
+    if (self->priv->output_stream == NULL)
+        self->priv->output_stream = (GOutputStream *)_gvir_output_stream_new(self);
+
+    return self->priv->output_stream;
+}
+
+
 static gboolean gvir_stream_close(GIOStream *io_stream,
                                   GCancellable *cancellable, G_GNUC_UNUSED GError **error)
 {
@@ -85,8 +97,8 @@ static gboolean gvir_stream_close(GIOStream *io_stream,
     if (self->priv->input_stream)
         g_input_stream_close(self->priv->input_stream, cancellable, NULL);
 
-    if (self->priv->in_dispose)
-        return TRUE;
+    if (self->priv->output_stream)
+        g_output_stream_close(self->priv->output_stream, cancellable, NULL);
 
     return TRUE; /* FIXME: really close the stream? */
 }
@@ -201,6 +213,7 @@ static void gvir_stream_class_init(GVirStreamClass *klass)
     object_class->set_property = gvir_stream_set_property;
 
     stream_class->get_input_stream = gvir_stream_get_input_stream;
+    stream_class->get_output_stream = gvir_stream_get_output_stream;
     stream_class->close_fn = gvir_stream_close;
     stream_class->close_async = gvir_stream_close_async;
     stream_class->close_finish = gvir_stream_close_finish;
@@ -339,3 +352,99 @@ gvir_stream_receive_all(GVirStream *self, GVirStreamSinkFunc func, gpointer user
 
     return r;
 }
+
+
+/**
+ * gvir_stream_send:
+ * @stream: the stream
+ * @buffer: a buffer to write data from (which should be at least @size
+ *     bytes long).
+ * @size: the number of bytes you want to write to the stream
+ * @cancellable: (allow-none): a %GCancellable or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Send data (up to @size bytes) from a stream.
+ * On error -1 is returned and @error is set accordingly.
+ *
+ * gvir_stream_send() can return any number of bytes, up to
+ * @size. If more than @size bytes have been sendd, the additional
+ * data will be returned in future calls to gvir_stream_send().
+ *
+ * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be
+ * returned.
+ *
+ * Returns: Number of bytes read, or 0 if the end of stream reached,
+ * or -1 on error.
+ */
+gssize gvir_stream_send(GVirStream *self, const gchar *buffer, gsize size,
+                        GCancellable *cancellable, GError **error)
+{
+    int got;
+
+    g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
+    g_return_val_if_fail(buffer != NULL, -1);
+
+    if (g_cancellable_set_error_if_cancelled (cancellable, error))
+        return -1;
+
+    got = virStreamSend(self->priv->handle, buffer, size);
+
+    if (got == -2) {  /* blocking */
+        g_set_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, NULL);
+    } else if (got < 0) {
+        g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+                    "Got virStreamRecv error in %s", G_STRFUNC);
+    }
+
+    return got;
+}
+
+struct stream_source_helper {
+    GVirStream *self;
+    GVirStreamSourceFunc func;
+    gpointer user_data;
+};
+
+static int
+stream_source(virStreamPtr st G_GNUC_UNUSED,
+              char *bytes, size_t nbytes, void *opaque)
+{
+  struct stream_source_helper *helper = opaque;
+
+  return helper->func(helper->self, bytes, nbytes, helper->user_data);
+}
+
+/**
+ * gvir_stream_send_all:
+ * @stream: the stream
+ * @func: (scope notified): the callback for writing data to application
+ * @user_data: (closure): data to be passed to @callback
+ * Returns: the number of bytes consumed or -1 upon error
+ *
+ * Send the entire data stream, sending the data to the
+ * requested data source. This is simply a convenient alternative
+ * to virStreamRecv, for apps that do blocking-I/o.
+ */
+gssize
+gvir_stream_send_all(GVirStream *self, GVirStreamSourceFunc func, gpointer user_data, GError **err)
+{
+    struct stream_source_helper helper = {
+        .self = self,
+        .func = func,
+        .user_data = user_data
+    };
+    int r;
+
+    g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
+    g_return_val_if_fail(func != NULL, -1);
+
+    r = virStreamSendAll(self->priv->handle, stream_source, &helper);
+    if (r < 0) {
+        if (err != NULL)
+            *err = gvir_error_new_literal(GVIR_STREAM_ERROR,
+                                          0,
+                                          "Unable to perform SendAll");
+    }
+
+    return r;
+}
diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h
index 35526db..5a1ee68 100644
--- a/libvirt-gobject/libvirt-gobject-stream.h
+++ b/libvirt-gobject/libvirt-gobject-stream.h
@@ -71,17 +71,34 @@ struct _GVirStreamClass
  * Returns: the number of bytes filled, 0 upon end
  * of file, or -1 upon error
  */
-typedef gint (* GVirStreamSinkFunc) (GVirStream *stream,
-                                     const gchar *buf,
-                                     gsize nbytes,
-                                     gpointer user_data);
+typedef gint (* GVirStreamSinkFunc)(GVirStream *stream,
+                                    const gchar *buf,
+                                    gsize nbytes,
+                                    gpointer user_data);
+
+/**
+ * GVirStreamSourceFunc:
+ * @stream: a #GVirStream
+ * @buf: (out) (array length=nbytes) (transfer none): data pointer
+ * @nbytes: data size
+ * @user_data: user data passed to the function
+ * Returns: the number of bytes filled, 0 upon end
+ * of file, or -1 upon error
+ */
+typedef gint (* GVirStreamSourceFunc)(GVirStream *stream,
+                                      gchar *buf,
+                                      gsize nbytes,
+                                      gpointer user_data);
 
 GType gvir_stream_get_type(void);
 GType gvir_stream_handle_get_type(void);
 
-gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **err);
+gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **error);
 gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error);
 
+gssize gvir_stream_send_all(GVirStream *stream, GVirStreamSourceFunc func, gpointer user_data, GError **error);
+gssize gvir_stream_send(GVirStream *stream, const gchar *buffer, gsize size, GCancellable *cancellable, GError **error);
+
 G_END_DECLS
 
 #endif /* __LIBVIRT_GOBJECT_STREAM_H__ */
-- 
1.7.6.4




More information about the libvir-list mailing list