[libvirt] [libvirt-glib] Turn GVirStream into a GIOStream

Marc-André Lureau marcandre.lureau at gmail.com
Sat Oct 1 17:57:46 UTC 2011


Allows to read async from a stream with GVirInputStream.
This is modelled after GSocket.
---
 libvirt-gobject/Makefile.am                    |    2 +
 libvirt-gobject/libvirt-gobject-connection.c   |    2 +-
 libvirt-gobject/libvirt-gobject-input-stream.c |  239 ++++++++++++++++++++++++
 libvirt-gobject/libvirt-gobject-input-stream.h |   68 +++++++
 libvirt-gobject/libvirt-gobject-stream.c       |  130 +++++++++++++-
 libvirt-gobject/libvirt-gobject-stream.h       |   10 +-
 6 files changed, 443 insertions(+), 8 deletions(-)
 create mode 100644 libvirt-gobject/libvirt-gobject-input-stream.c
 create mode 100644 libvirt-gobject/libvirt-gobject-input-stream.h

diff --git a/libvirt-gobject/Makefile.am b/libvirt-gobject/Makefile.am
index 8147db2..7013675 100644
--- a/libvirt-gobject/Makefile.am
+++ b/libvirt-gobject/Makefile.am
@@ -40,6 +40,8 @@ libvirt_gobject_1_0_la_HEADERS = \
 libvirt_gobject_1_0_la_SOURCES = \
 			$(libvirt_gobject_1_0_la_HEADERS) \
 			libvirt-gobject-enums.c \
+			libvirt-gobject-input-stream.c \
+			libvirt-gobject-input-stream.h \
 			$(GOBJECT_SOURCE_FILES)
 libvirt_gobject_1_0_la_CFLAGS = \
 			-DDATADIR="\"$(datadir)\"" \
diff --git a/libvirt-gobject/libvirt-gobject-connection.c b/libvirt-gobject/libvirt-gobject-connection.c
index 5fc0a9e..95cd878 100644
--- a/libvirt-gobject/libvirt-gobject-connection.c
+++ b/libvirt-gobject/libvirt-gobject-connection.c
@@ -1151,7 +1151,7 @@ GVirStream *gvir_connection_get_stream(GVirConnection *self,
     klass = GVIR_CONNECTION_GET_CLASS(self);
     g_return_val_if_fail(klass->stream_new, NULL);
 
-    virStreamPtr st = virStreamNew(self->priv->conn, flags);
+    virStreamPtr st = virStreamNew(self->priv->conn, flags | VIR_STREAM_NONBLOCK);
 
     return klass->stream_new(self, st);
 }
diff --git a/libvirt-gobject/libvirt-gobject-input-stream.c b/libvirt-gobject/libvirt-gobject-input-stream.c
new file mode 100644
index 0000000..a76d670
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-input-stream.c
@@ -0,0 +1,239 @@
+/*
+ * libvirt-gobject-input-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Authors: Daniel P. Berrange <berrange at redhat.com>
+ *          Marc-André Lureau <marcandre.lureau at redhat.com>
+ */
+
+#include <config.h>
+
+#include <libvirt/virterror.h>
+#include <string.h>
+
+#include "libvirt-glib/libvirt-glib.h"
+#include "libvirt-gobject/libvirt-gobject.h"
+#include "libvirt-gobject-input-stream.h"
+
+extern gboolean debugFlag;
+
+#define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); } while (0)
+
+#define gvir_input_stream_get_type _gvir_input_stream_get_type
+G_DEFINE_TYPE(GVirInputStream, gvir_input_stream, G_TYPE_INPUT_STREAM);
+
+enum
+{
+    PROP_0,
+    PROP_STREAM
+};
+
+struct _GVirInputStreamPrivate
+{
+    GVirStream *stream;
+
+    /* pending operation metadata */
+    GSimpleAsyncResult *result;
+    GCancellable *cancellable;
+    gpointer buffer;
+    gsize count;
+};
+
+static void gvir_input_stream_get_property(GObject    *object,
+                                           guint       prop_id,
+                                           GValue     *value,
+                                           GParamSpec *pspec)
+{
+    GVirInputStream *stream = GVIR_INPUT_STREAM(object);
+
+    switch (prop_id) {
+    case PROP_STREAM:
+        g_value_set_object(value, stream->priv->stream);
+        break;
+
+    default:
+        G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    }
+}
+
+static void gvir_input_stream_set_property(GObject      *object,
+                                           guint         prop_id,
+                                           const GValue *value,
+                                           GParamSpec   *pspec)
+{
+    GVirInputStream *stream = GVIR_INPUT_STREAM(object);
+
+    switch (prop_id) {
+    case PROP_STREAM:
+        stream->priv->stream = g_value_dup_object(value);
+        break;
+
+    default:
+        G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+    }
+}
+
+static void gvir_input_stream_finalize(GObject *object)
+{
+    GVirInputStream *stream = GVIR_INPUT_STREAM(object);
+
+    if (stream->priv->stream)
+        g_object_unref(stream->priv->stream);
+
+    if (G_OBJECT_CLASS(gvir_input_stream_parent_class)->finalize)
+        (*G_OBJECT_CLASS(gvir_input_stream_parent_class)->finalize)(object);
+}
+
+static void
+gvir_input_stream_read_ready (G_GNUC_UNUSED virStreamPtr st,
+                              int events, void *opaque)
+{
+    GVirInputStream *stream = GVIR_INPUT_STREAM(opaque);
+    GVirInputStreamPrivate *priv = stream->priv;
+    GSimpleAsyncResult *simple;
+    GError *error = NULL;
+    gssize result;
+
+    g_return_if_fail(events & VIR_STREAM_EVENT_READABLE);
+
+    result  = gvir_stream_receive(priv->stream, priv->buffer, priv->count,
+                                  priv->cancellable, &error);
+
+    if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+        g_warn_if_reached();
+        return;
+    }
+
+    simple = stream->priv->result;
+    stream->priv->result = NULL;
+
+    if (result >= 0)
+        g_simple_async_result_set_op_res_gssize(simple, result);
+
+    if (error)
+        g_simple_async_result_take_error(simple, error);
+
+    if (priv->cancellable) {
+        g_object_unref(stream->priv->cancellable);
+        priv->cancellable = NULL;
+    }
+
+    g_simple_async_result_complete(simple);
+    g_object_unref(simple);
+
+    return;
+}
+
+static void gvir_input_stream_read_async(GInputStream        *stream,
+                                         void                *buffer,
+                                         gsize                count,
+                                         G_GNUC_UNUSED int    io_priority,
+                                         GCancellable        *cancellable,
+                                         GAsyncReadyCallback  callback,
+                                         gpointer             user_data)
+{
+    GVirInputStream *input_stream = GVIR_INPUT_STREAM(stream);
+    virStreamPtr handle;
+
+    g_return_if_fail(GVIR_IS_INPUT_STREAM(stream));
+    g_return_if_fail(input_stream->priv->result == NULL);
+
+    g_object_get(input_stream->priv->stream, "handle", &handle, NULL);
+
+    if (virStreamEventAddCallback(handle, VIR_STREAM_EVENT_READABLE,
+                                  gvir_input_stream_read_ready, stream, NULL) < 0) {
+        g_simple_async_report_error_in_idle(G_OBJECT(stream),
+                                            callback,
+                                            user_data,
+                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+                                            "Couldn't add event callback %s",
+                                            G_STRFUNC);
+        goto end;
+    }
+
+    input_stream->priv->result =
+        g_simple_async_result_new(G_OBJECT(stream), callback, user_data,
+                                  gvir_input_stream_read_async);
+    if (cancellable)
+        g_object_ref(cancellable);
+    input_stream->priv->cancellable = cancellable;
+    input_stream->priv->buffer = buffer;
+    input_stream->priv->count = count;
+
+end:
+    virStreamFree(handle);
+}
+
+
+static gssize gvir_input_stream_read_finish(GInputStream  *stream,
+                                            GAsyncResult  *result,
+                                            G_GNUC_UNUSED GError **error)
+{
+    GVirInputStream *input_stream = GVIR_INPUT_STREAM(stream);
+    GSimpleAsyncResult *simple;
+    virStreamPtr handle;
+    gssize count;
+
+    g_return_val_if_fail(GVIR_IS_INPUT_STREAM(stream), -1);
+    g_object_get(input_stream->priv->stream, "handle", &handle, NULL);
+
+    simple = G_SIMPLE_ASYNC_RESULT(result);
+
+    g_warn_if_fail(g_simple_async_result_get_source_tag(simple) == gvir_input_stream_read_async);
+
+    count = g_simple_async_result_get_op_res_gssize(simple);
+
+    virStreamEventRemoveCallback(handle);
+    virStreamFree(handle);
+
+    return count;
+}
+
+
+static void gvir_input_stream_class_init(GVirInputStreamClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+    GInputStreamClass *ginputstream_class = G_INPUT_STREAM_CLASS(klass);
+
+    g_type_class_add_private(klass, sizeof(GVirInputStreamPrivate));
+
+    gobject_class->finalize = gvir_input_stream_finalize;
+    gobject_class->get_property = gvir_input_stream_get_property;
+    gobject_class->set_property = gvir_input_stream_set_property;
+
+    ginputstream_class->read_fn = NULL;
+    ginputstream_class->read_async = gvir_input_stream_read_async;
+    ginputstream_class->read_finish = gvir_input_stream_read_finish;
+
+    g_object_class_install_property(gobject_class, PROP_STREAM,
+                                    g_param_spec_object("stream",
+                                                        "stream",
+                                                        "GVirStream",
+                                                        GVIR_TYPE_STREAM, G_PARAM_CONSTRUCT_ONLY |
+                                                        G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static void gvir_input_stream_init(GVirInputStream *stream)
+{
+    stream->priv = G_TYPE_INSTANCE_GET_PRIVATE(stream, GVIR_TYPE_INPUT_STREAM, GVirInputStreamPrivate);
+}
+
+GVirInputStream* _gvir_input_stream_new(GVirStream *stream)
+{
+    return GVIR_INPUT_STREAM(g_object_new(GVIR_TYPE_INPUT_STREAM, "stream", stream, NULL));
+}
diff --git a/libvirt-gobject/libvirt-gobject-input-stream.h b/libvirt-gobject/libvirt-gobject-input-stream.h
new file mode 100644
index 0000000..e8002b9
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-input-stream.h
@@ -0,0 +1,68 @@
+/*
+ * libvirt-gobject-input-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Authors: Daniel P. Berrange <berrange at redhat.com>
+ *          Marc-André Lureau <marcandre.lureau at redhat.com>
+ */
+
+#if !defined(__LIBVIRT_GOBJECT_H__) && !defined(LIBVIRT_GOBJECT_BUILD)
+#error "Only <libvirt-gobject/libvirt-gobject.h> can be included directly."
+#endif
+
+#ifndef __LIBVIRT_GOBJECT_INPUT_STREAM_H__
+#define __LIBVIRT_GOBJECT_INPUT_STREAM_H__
+
+#include <gio/gio.h>
+#include "libvirt-gobject-stream.h"
+
+G_BEGIN_DECLS
+
+#define GVIR_TYPE_INPUT_STREAM                          (_gvir_input_stream_get_type ())
+#define GVIR_INPUT_STREAM(inst)                         (G_TYPE_CHECK_INSTANCE_CAST ((inst), \
+                                                         GVIR_TYPE_INPUT_STREAM, GVirInputStream))
+#define GVIR_INPUT_STREAM_CLASS(class)                  (G_TYPE_CHECK_CLASS_CAST ((class), \
+                                                         GVIR_TYPE_INPUT_STREAM, GVirInputStreamClass))
+#define GVIR_IS_INPUT_STREAM(inst)                      (G_TYPE_CHECK_INSTANCE_TYPE ((inst), \
+                                                         GVIR_TYPE_INPUT_STREAM))
+#define GVIR_IS_INPUT_STREAM_CLASS(class)               (G_TYPE_CHECK_CLASS_TYPE ((class), \
+                                                         GVIR_TYPE_INPUT_STREAM))
+#define GVIR_INPUT_STREAM_GET_CLASS(inst)               (G_TYPE_INSTANCE_GET_CLASS ((inst), \
+                                                         GVIR_TYPE_INPUT_STREAM, GVirInputStreamClass))
+
+typedef struct _GVirInputStreamPrivate                   GVirInputStreamPrivate;
+typedef struct _GVirInputStreamClass                     GVirInputStreamClass;
+typedef struct _GVirInputStream                          GVirInputStream;
+
+struct _GVirInputStreamClass
+{
+    GInputStreamClass parent_class;
+};
+
+struct _GVirInputStream
+{
+    GInputStream parent_instance;
+    GVirInputStreamPrivate *priv;
+};
+
+GType                _gvir_input_stream_get_type                 (void) G_GNUC_CONST;
+GVirInputStream *    _gvir_input_stream_new                      (GVirStream *stream);
+
+G_END_DECLS
+
+#endif /* __LIBVIRT_GOBJECT_INPUT_STREAM_H__ */
diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c
index 519d733..88e3a40 100644
--- a/libvirt-gobject/libvirt-gobject-stream.c
+++ b/libvirt-gobject/libvirt-gobject-stream.c
@@ -30,6 +30,8 @@
 #include "libvirt-glib/libvirt-glib.h"
 #include "libvirt-gobject/libvirt-gobject.h"
 
+#include "libvirt-gobject/libvirt-gobject-input-stream.h"
+
 extern gboolean debugFlag;
 
 #define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); } while (0)
@@ -39,10 +41,12 @@ extern gboolean debugFlag;
 
 struct _GVirStreamPrivate
 {
-    virStreamPtr handle;
+    virStreamPtr   handle;
+    GInputStream  *input_stream;
+    gboolean       in_dispose;
 };
 
-G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_OBJECT);
+G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
 
 
 enum {
@@ -60,6 +64,71 @@ gvir_stream_error_quark(void)
     return g_quark_from_static_string("vir-g-stream");
 }
 
+
+static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream)
+{
+    GVirStream *self = GVIR_STREAM(io_stream);
+
+    if (self->priv->input_stream == NULL)
+        self->priv->input_stream = (GInputStream *)_gvir_input_stream_new(self);
+
+    return self->priv->input_stream;
+}
+
+
+static gboolean gvir_stream_close(GIOStream *io_stream,
+                                  GCancellable *cancellable, G_GNUC_UNUSED GError **error)
+{
+    GVirStream *self = GVIR_STREAM(io_stream);
+
+    if (self->priv->input_stream)
+        g_input_stream_close(self->priv->input_stream, cancellable, NULL);
+
+    if (self->priv->in_dispose)
+        return TRUE;
+
+    return TRUE; /* FIXME: really close the stream? */
+}
+
+
+static void gvir_stream_close_async(GIOStream *stream, G_GNUC_UNUSED int io_priority,
+                                    GCancellable *cancellable, GAsyncReadyCallback callback,
+                                    gpointer user_data)
+{
+    GSimpleAsyncResult *res;
+    GIOStreamClass *class;
+    GError *error;
+
+    class = G_IO_STREAM_GET_CLASS(stream);
+
+    /* close is not blocked, just do it! */
+    error = NULL;
+    if (class->close_fn &&
+        !class->close_fn(stream, cancellable, &error)) {
+        g_simple_async_report_take_gerror_in_idle(G_OBJECT (stream),
+                                                  callback, user_data,
+                                                  error);
+        return;
+    }
+
+    res = g_simple_async_result_new(G_OBJECT (stream),
+                                    callback,
+                                    user_data,
+                                    gvir_stream_close_async);
+    g_simple_async_result_complete_in_idle(res);
+    g_object_unref (res);
+}
+
+
+static gboolean
+gvir_stream_close_finish(G_GNUC_UNUSED GIOStream *stream,
+                         G_GNUC_UNUSED GAsyncResult *result,
+                         G_GNUC_UNUSED GError **error)
+{
+    return TRUE;
+}
+
+
 static void gvir_stream_get_property(GObject *object,
                                      guint prop_id,
                                      GValue *value,
@@ -107,6 +176,9 @@ static void gvir_stream_finalize(GObject *object)
 
     DEBUG("Finalize GVirStream=%p", self);
 
+    if (self->priv->input_stream)
+        g_object_unref(self->priv->input_stream);
+
     if (priv->handle) {
         if (virStreamFinish(priv->handle) < 0)
             g_critical("cannot finish stream");
@@ -120,12 +192,18 @@ static void gvir_stream_finalize(GObject *object)
 
 static void gvir_stream_class_init(GVirStreamClass *klass)
 {
-    GObjectClass *object_class = G_OBJECT_CLASS (klass);
+    GObjectClass *object_class = G_OBJECT_CLASS(klass);
+    GIOStreamClass *stream_class = G_IO_STREAM_CLASS(klass);
 
     object_class->finalize = gvir_stream_finalize;
     object_class->get_property = gvir_stream_get_property;
     object_class->set_property = gvir_stream_set_property;
 
+    stream_class->get_input_stream = gvir_stream_get_input_stream;
+    stream_class->close_fn = gvir_stream_close;
+    stream_class->close_async = gvir_stream_close_async;
+    stream_class->close_finish = gvir_stream_close_finish;
+
     g_object_class_install_property(object_class,
                                     PROP_HANDLE,
                                     g_param_spec_boxed("handle",
@@ -170,6 +248,50 @@ GType gvir_stream_handle_get_type(void)
     return handle_type;
 }
 
+/**
+ * gvir_stream_receive:
+ * @stream: the stream
+ * @buffer: a buffer to read data into (which should be at least @size
+ *     bytes long).
+ * @size: the number of bytes you want to read from the stream
+ * @cancellable: (allow-none): a %GCancellable or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Receive data (up to @size bytes) from a stream.
+ * On error -1 is returned and @error is set accordingly.
+ *
+ * gvir_stream_receive() can return any number of bytes, up to
+ * @size. If more than @size bytes have been received, the additional
+ * data will be returned in future calls to gvir_stream_receive().
+ *
+ * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be
+ * returned.
+ *
+ * Returns: Number of bytes read, or 0 if the end of stream reached,
+ * or -1 on error.
+ */
+gssize gvir_stream_receive(GVirStream *self, gchar *buffer, gsize size,
+                           GCancellable *cancellable, GError **error)
+{
+    int got;
+
+    g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
+    g_return_val_if_fail(buffer != NULL, -1);
+
+    if (g_cancellable_set_error_if_cancelled (cancellable, error))
+        return -1;
+
+    got = virStreamRecv(self->priv->handle, buffer, size);
+
+    if (got == -2) {  /* blocking */
+        g_set_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, NULL);
+    } else if (got < 0) {
+        g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+                    "Got virStreamRecv error in %s", G_STRFUNC);
+    }
+
+    return got;
+}
 
 struct stream_sink_helper {
     GVirStream *self;
@@ -197,7 +319,7 @@ stream_sink(virStreamPtr st G_GNUC_UNUSED,
  * requested data sink. This is simply a convenient alternative
  * to virStreamRecv, for apps that do blocking-I/o.
  */
-gint
+gssize
 gvir_stream_receive_all(GVirStream *self, GVirStreamSinkFunc func, gpointer user_data, GError **err)
 {
     struct stream_sink_helper helper = {
diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h
index 5181e24..35526db 100644
--- a/libvirt-gobject/libvirt-gobject-stream.h
+++ b/libvirt-gobject/libvirt-gobject-stream.h
@@ -28,6 +28,9 @@
 #ifndef __LIBVIRT_GOBJECT_STREAM_H__
 #define __LIBVIRT_GOBJECT_STREAM_H__
 
+#include <glib-object.h>
+#include <gio/gio.h>
+
 G_BEGIN_DECLS
 
 #define GVIR_TYPE_STREAM            (gvir_stream_get_type ())
@@ -45,7 +48,7 @@ typedef struct _GVirStreamClass GVirStreamClass;
 
 struct _GVirStream
 {
-    GObject parent;
+    GIOStream parent_instance;
 
     GVirStreamPrivate *priv;
 
@@ -54,7 +57,7 @@ struct _GVirStream
 
 struct _GVirStreamClass
 {
-    GObjectClass parent_class;
+    GIOStreamClass parent_class;
 
     gpointer padding[20];
 };
@@ -76,7 +79,8 @@ typedef gint (* GVirStreamSinkFunc) (GVirStream *stream,
 GType gvir_stream_get_type(void);
 GType gvir_stream_handle_get_type(void);
 
-gint gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **err);
+gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **err);
+gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error);
 
 G_END_DECLS
 
-- 
1.7.6.2




More information about the libvir-list mailing list