[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[fedora-virt] [PATCH] Remove receive callbacks



This is a repost of my previous patch, rebased against the current head.

Matt
--
Matthew Booth, RHCA, RHCSS
Red Hat Engineering, Virtualisation Team

M:       +44 (0)7977 267231
GPG ID:  D33C3490
GPG FPR: 3733 612D 2D05 5458 8A8A 1600 3441 EA19 D33C 3490
>From 5f57439a6139fa03560cb3a5351eedc2ebe42e19 Mon Sep 17 00:00:00 2001
From: Matthew Booth <mbooth redhat com>
Date: Sat, 27 Jun 2009 22:05:48 +0100
Subject: [PATCH] Remove receive callbacks

This patch fixes a class of race conditions characterised by the
following sequence of events:

LIBRARY                                 DAEMON
send download request
                                        receive download request
                                        respond with download response
                                        start sending file chunks
set reply callback to 'download'
run main loop

At this stage the download reply callback receives both the download
reply and some file chunks. The current architecture doesn't provide a clean way
to prevent this from happening.

This patch fixes the above problem by changing the socket receive
handler to do nothing but buffering, and provides 2 new apis:

guestfs_get_reply
guestfs_free_reply

These will always de-queue exactly 1 message, which is always what is
wanted.
---
 src/generator.ml |  211 ++++++++----------
 src/guestfs.c    |  672 +++++++++++++++++++++++++-----------------------------
 src/guestfs.h    |   13 +-
 3 files changed, 417 insertions(+), 479 deletions(-)

diff --git a/src/generator.ml b/src/generator.ml
index c65e717..c64b8c7 100755
--- a/src/generator.ml
+++ b/src/generator.ml
@@ -3734,83 +3734,6 @@ check_state (guestfs_h *g, const char *caller)
   List.iter (
     fun (shortname, style, _, _, _, _, _) ->
       let name = "guestfs_" ^ shortname in
-
-      (* Generate the context struct which stores the high-level
-       * state between callback functions.
-       *)
-      pr "struct %s_ctx {\n" shortname;
-      pr "  /* This flag is set by the callbacks, so we know we've done\n";
-      pr "   * the callbacks as expected, and in the right sequence.\n";
-      pr "   * 0 = not called, 1 = reply_cb called.\n";
-      pr "   */\n";
-      pr "  int cb_sequence;\n";
-      pr "  struct guestfs_message_header hdr;\n";
-      pr "  struct guestfs_message_error err;\n";
-      (match fst style with
-       | RErr -> ()
-       | RConstString _ ->
-	   failwithf "RConstString cannot be returned from a daemon function"
-       | RInt _ | RInt64 _
-       | RBool _ | RString _ | RStringList _
-       | RIntBool _
-       | RPVList _ | RVGList _ | RLVList _
-       | RStat _ | RStatVFS _
-       | RHashtable _
-       | RDirentList _ ->
-	   pr "  struct %s_ret ret;\n" name
-      );
-      pr "};\n";
-      pr "\n";
-
-      (* Generate the reply callback function. *)
-      pr "static void %s_reply_cb (guestfs_h *g, void *data, XDR *xdr)\n" shortname;
-      pr "{\n";
-      pr "  guestfs_main_loop *ml = guestfs_get_main_loop (g);\n";
-      pr "  struct %s_ctx *ctx = (struct %s_ctx *) data;\n" shortname shortname;
-      pr "\n";
-      pr "  /* This should definitely not happen. */\n";
-      pr "  if (ctx->cb_sequence != 0) {\n";
-      pr "    ctx->cb_sequence = 9999;\n";
-      pr "    error (g, \"%%s: internal error: reply callback called twice\", \"%s\");\n" name;
-      pr "    return;\n";
-      pr "  }\n";
-      pr "\n";
-      pr "  ml->main_loop_quit (ml, g);\n";
-      pr "\n";
-      pr "  if (!xdr_guestfs_message_header (xdr, &ctx->hdr)) {\n";
-      pr "    error (g, \"%%s: failed to parse reply header\", \"%s\");\n" name;
-      pr "    return;\n";
-      pr "  }\n";
-      pr "  if (ctx->hdr.status == GUESTFS_STATUS_ERROR) {\n";
-      pr "    if (!xdr_guestfs_message_error (xdr, &ctx->err)) {\n";
-      pr "      error (g, \"%%s: failed to parse reply error\", \"%s\");\n"
-	name;
-      pr "      return;\n";
-      pr "    }\n";
-      pr "    goto done;\n";
-      pr "  }\n";
-
-      (match fst style with
-       | RErr -> ()
-       | RConstString _ ->
-	   failwithf "RConstString cannot be returned from a daemon function"
-       | RInt _ | RInt64 _
-       | RBool _ | RString _ | RStringList _
-       | RIntBool _
-       | RPVList _ | RVGList _ | RLVList _
-       | RStat _ | RStatVFS _
-       | RHashtable _
-       | RDirentList _ ->
-	    pr "  if (!xdr_%s_ret (xdr, &ctx->ret)) {\n" name;
-	    pr "    error (g, \"%%s: failed to parse reply\", \"%s\");\n" name;
-	    pr "    return;\n";
-	    pr "  }\n";
-      );
-
-      pr " done:\n";
-      pr "  ctx->cb_sequence = 1;\n";
-      pr "}\n\n";
-
       (* Generate the action stub. *)
       generate_prototype ~extern:false ~semicolon:false ~newline:true
 	~handle:"g" name style;
@@ -3834,15 +3757,27 @@ check_state (guestfs_h *g, const char *caller)
        | _ -> pr "  struct %s_args args;\n" name
       );
 
-      pr "  struct %s_ctx ctx;\n" shortname;
-      pr "  guestfs_main_loop *ml = guestfs_get_main_loop (g);\n";
+      pr "  struct guestfs_message_header hdr = {};\n";
+      pr "  struct guestfs_message_error err = {};\n";
+      (match fst style with
+       | RErr -> ()
+       | RConstString _ ->
+	   failwithf "RConstString cannot be returned from a daemon function"
+       | RInt _ | RInt64 _
+       | RBool _ | RString _ | RStringList _
+       | RIntBool _
+       | RPVList _ | RVGList _ | RLVList _
+       | RStat _ | RStatVFS _
+       | RHashtable _
+       | RDirentList _ ->
+	   pr "  struct %s_ret ret = {};\n" name
+      );
+
       pr "  int serial;\n";
       pr "\n";
       pr "  if (check_state (g, \"%s\") == -1) return %s;\n" name error_code;
       pr "  guestfs_set_busy (g);\n";
       pr "\n";
-      pr "  memset (&ctx, 0, sizeof ctx);\n";
-      pr "\n";
 
       (* Send the main header and arguments. *)
       (match snd style with
@@ -3877,7 +3812,6 @@ check_state (guestfs_h *g, const char *caller)
       pr "\n";
 
       (* Send any additional files (FileIn) requested. *)
-      let need_read_reply_label = ref false in
       List.iter (
 	function
 	| FileIn n ->
@@ -3889,83 +3823,130 @@ check_state (guestfs_h *g, const char *caller)
 	    pr "      guestfs_end_busy (g);\n";
 	    pr "      return %s;\n" error_code;
 	    pr "    }\n";
-	    pr "    if (r == -2) /* daemon cancelled */\n";
-	    pr "      goto read_reply;\n";
-	    need_read_reply_label := true;
 	    pr "  }\n";
 	    pr "\n";
 	| _ -> ()
       ) (snd style);
 
       (* Wait for the reply from the remote end. *)
-      if !need_read_reply_label then pr " read_reply:\n";
-      pr "  guestfs__switch_to_receiving (g);\n";
-      pr "  ctx.cb_sequence = 0;\n";
-      pr "  guestfs_set_reply_callback (g, %s_reply_cb, &ctx);\n" shortname;
-      pr "  (void) ml->main_loop_run (ml, g);\n";
-      pr "  guestfs_set_reply_callback (g, NULL, NULL);\n";
-      pr "  if (ctx.cb_sequence != 1) {\n";
-      pr "    error (g, \"%%s reply failed, see earlier error messages\", \"%s\");\n" name;
-      pr "    guestfs_end_busy (g);\n";
-      pr "    return %s;\n" error_code;
+      pr "  guestfs_reply_t reply;\n";
+      pr "\n";
+      pr "  for (;;) {\n";
+      pr "    guestfs_get_reply (g, &reply, 1);\n";
+      pr "\n";
+      pr "    if (GUESTFS_CANCEL_FLAG == reply.len) {\n";
+      pr "      /* This message was delayed from a previous file transaction. */\n";
+      pr "      continue;\n";
+      pr "    }\n";
+      pr "\n";
+      pr "    if (GUESTFS_LAUNCH_FLAG == reply.len) {\n";
+      pr "      error (g, \"%%s reply failed, received unexpected launch message\",\n";
+      pr "             \"%s\");\n" name;
+      pr "      guestfs_end_busy (g);\n";
+      pr "      return %s;\n" error_code;
+      pr "    }\n";
+      pr "\n";
+      pr "    if (0 == reply.len) {\n";
+      pr "      error (g, \"%%s reply failed, see earlier error messages\", \"%s\");\n" name;
+      pr "      guestfs_end_busy (g);\n";
+      pr "      return %s;\n" error_code;
+      pr "    }\n";
+      pr "\n";
+      pr "    break;\n";
       pr "  }\n";
       pr "\n";
 
-      pr "  if (check_reply_header (g, &ctx.hdr, GUESTFS_PROC_%s, serial) == -1) {\n"
+      pr "  if (!xdr_guestfs_message_header (&reply.xdr, &hdr)) {\n";
+      pr "    error (g, \"%%s: failed to parse reply header\", \"%s\");\n" name;
+      pr "    goto recv_error;\n";
+      pr "  }\n";
+      pr "\n";
+      pr "  if (hdr.status == GUESTFS_STATUS_ERROR) {\n";
+      pr "    if (!xdr_guestfs_message_error (&reply.xdr, &err)) {\n";
+      pr "      error (g, \"%%s: failed to parse reply error\", \"%s\");\n"
+	name;
+      pr "      goto recv_error;\n";
+      pr "    }\n";
+      pr "  }\n";
+
+      (match fst style with
+       | RErr -> ()
+       | RConstString _ ->
+	   failwithf "RConstString cannot be returned from a daemon function"
+       | RInt _ | RInt64 _
+       | RBool _ | RString _ | RStringList _
+       | RIntBool _
+       | RPVList _ | RVGList _ | RLVList _
+       | RStat _ | RStatVFS _
+       | RHashtable _
+       | RDirentList _ ->
+	    pr "  else if (!xdr_%s_ret (&reply.xdr, &ret)) {\n" name;
+	    pr "    error (g, \"%%s: failed to parse reply\", \"%s\");\n" name;
+	    pr "    goto recv_error;\n";
+	    pr "  }\n";
+      );
+
+      pr "  if (check_reply_header (g, &hdr, GUESTFS_PROC_%s, serial) == -1) {\n"
 	(String.uppercase shortname);
-      pr "    guestfs_end_busy (g);\n";
-      pr "    return %s;\n" error_code;
+      pr "    goto recv_error;\n";
       pr "  }\n";
       pr "\n";
 
-      pr "  if (ctx.hdr.status == GUESTFS_STATUS_ERROR) {\n";
-      pr "    error (g, \"%%s\", ctx.err.error_message);\n";
-      pr "    free (ctx.err.error_message);\n";
-      pr "    guestfs_end_busy (g);\n";
-      pr "    return %s;\n" error_code;
+      pr "  if (hdr.status == GUESTFS_STATUS_ERROR) {\n";
+      pr "    error (g, \"%%s\", err.error_message);\n";
+      pr "    free (err.error_message);\n";
+      pr "    goto recv_error;\n";
       pr "  }\n";
       pr "\n";
 
+      pr "  guestfs_free_reply (g, &reply);\n\n";
+
       (* Expecting to receive further files (FileOut)? *)
       List.iter (
 	function
 	| FileOut n ->
 	    pr "  if (guestfs__receive_file_sync (g, %s) == -1) {\n" n;
-	    pr "    guestfs_end_busy (g);\n";
-	    pr "    return %s;\n" error_code;
+            pr "    guestfs_end_busy (g);\n";
+            pr "    return %s;\n" error_code;
 	    pr "  }\n";
 	    pr "\n";
 	| _ -> ()
       ) (snd style);
 
-      pr "  guestfs_end_busy (g);\n";
+      pr "  guestfs_end_busy (g);\n\n";
 
       (match fst style with
        | RErr -> pr "  return 0;\n"
        | RInt n | RInt64 n | RBool n ->
-	   pr "  return ctx.ret.%s;\n" n
+	   pr "  return ret.%s;\n" n
        | RConstString _ ->
 	   failwithf "RConstString cannot be returned from a daemon function"
        | RString n ->
-	   pr "  return ctx.ret.%s; /* caller will free */\n" n
+	   pr "  return ret.%s; /* caller will free */\n" n
        | RStringList n | RHashtable n ->
 	   pr "  /* caller will free this, but we need to add a NULL entry */\n";
-	   pr "  ctx.ret.%s.%s_val =\n" n n;
-	   pr "    safe_realloc (g, ctx.ret.%s.%s_val,\n" n n;
-	   pr "                  sizeof (char *) * (ctx.ret.%s.%s_len + 1));\n"
+	   pr "  ret.%s.%s_val =\n" n n;
+	   pr "    safe_realloc (g, ret.%s.%s_val,\n" n n;
+	   pr "                  sizeof (char *) * (ret.%s.%s_len + 1));\n"
 	     n n;
-	   pr "  ctx.ret.%s.%s_val[ctx.ret.%s.%s_len] = NULL;\n" n n n n;
-	   pr "  return ctx.ret.%s.%s_val;\n" n n
+	   pr "  ret.%s.%s_val[ret.%s.%s_len] = NULL;\n" n n n n;
+	   pr "  return ret.%s.%s_val;\n" n n
        | RIntBool _ ->
 	   pr "  /* caller with free this */\n";
-	   pr "  return safe_memdup (g, &ctx.ret, sizeof (ctx.ret));\n"
+	   pr "  return safe_memdup (g, &ret, sizeof (ret));\n"
        | RPVList n | RVGList n | RLVList n
        | RStat n | RStatVFS n
        | RDirentList n ->
 	   pr "  /* caller will free this */\n";
-	   pr "  return safe_memdup (g, &ctx.ret.%s, sizeof (ctx.ret.%s));\n" n n
+	   pr "  return safe_memdup (g, &ret.%s, sizeof (ret.%s));\n" n n
       );
 
+      pr "\n";
+      pr " recv_error:\n";
+      pr "  guestfs_free_reply (g, &reply);\n";
+      pr "  guestfs_end_busy (g);\n";
+      pr "  return %s;\n" error_code;
+
       pr "}\n\n"
   ) daemon_functions
 
diff --git a/src/guestfs.c b/src/guestfs.c
index 350d848..79251ca 100644
--- a/src/guestfs.c
+++ b/src/guestfs.c
@@ -21,6 +21,7 @@
 #define _BSD_SOURCE /* for mkdtemp, usleep */
 #define _GNU_SOURCE /* for vasprintf, GNU strerror_r, strchrnul */
 
+#include <assert.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdarg.h>
@@ -78,8 +79,10 @@
 
 static void default_error_cb (guestfs_h *g, void *data, const char *msg);
 static void stdout_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
-static void sock_read_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
-static void sock_write_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
+static void sock_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
+static void sock_read (guestfs_h *g);
+static void sock_write (guestfs_h *g);
+static int sock_update_events (guestfs_h *g);
 
 static void close_handles (void);
 
@@ -161,6 +164,9 @@ struct guestfs_h
   int stdout_watch;		/* Watches qemu stdout for log messages. */
   int sock_watch;		/* Watches daemon comm socket. */
 
+  int sock_events;              /* events we're listening for on the comm
+                                   socket */
+
   char *tmpdir;			/* Temporary directory containing socket. */
 
   char *qemu_help, *qemu_version; /* Output of qemu -help, qemu -version. */
@@ -185,21 +191,18 @@ struct guestfs_h
   void *                     error_cb_data;
   guestfs_send_cb            send_cb;
   void *                     send_cb_data;
-  guestfs_reply_cb           reply_cb;
-  void *                     reply_cb_data;
   guestfs_log_message_cb     log_message_cb;
   void *                     log_message_cb_data;
   guestfs_subprocess_quit_cb subprocess_quit_cb;
   void *                     subprocess_quit_cb_data;
-  guestfs_launch_done_cb     launch_done_cb;
-  void *                     launch_done_cb_data;
 
   /* Main loop used by this handle. */
   guestfs_main_loop *main_loop;
 
   /* Messages sent and received from the daemon. */
   char *msg_in;
-  int msg_in_size, msg_in_allocated;
+  size_t msg_in_size, msg_in_pos, msg_in_consumed, msg_in_len;
+
   char *msg_out;
   int msg_out_size, msg_out_pos;
 
@@ -228,6 +231,8 @@ guestfs_create (void)
   g->stdout_watch = -1;
   g->sock_watch = -1;
 
+  g->sock_events = 0;
+
   g->abort_cb = abort;
   g->error_cb = default_error_cb;
   g->error_cb_data = NULL;
@@ -265,6 +270,11 @@ guestfs_create (void)
   } else
     g->memsize = 500;
 
+  /* Initialise the message receive buffer */
+  g->msg_in_size = GUESTFS_MESSAGE_MAX + sizeof (g->msg_in_len);
+  g->msg_in = safe_malloc (g, g->msg_in_size);
+  g->msg_in_pos = g->msg_in_consumed = 0;
+
   g->main_loop = guestfs_get_default_main_loop ();
 
   /* Start with large serial numbers so they are easy to spot
@@ -290,9 +300,10 @@ guestfs_create (void)
   return g;
 
  error:
-  free (g->path);
-  free (g->qemu);
-  free (g->append);
+  if (g->msg_in) free (g->msg_in);
+  if (g->path)   free (g->path);
+  if (g->qemu)   free (g->qemu);
+  if (g->append) free (g->append);
   free (g);
   return NULL;
 }
@@ -1159,10 +1170,6 @@ guestfs_launch (guestfs_h *g)
 
  connected:
   /* Watch the file descriptors. */
-  free (g->msg_in);
-  g->msg_in = NULL;
-  g->msg_in_size = g->msg_in_allocated = 0;
-
   free (g->msg_out);
   g->msg_out = NULL;
   g->msg_out_size = 0;
@@ -1177,7 +1184,9 @@ guestfs_launch (guestfs_h *g)
     goto cleanup3;
   }
 
-  if (guestfs__switch_to_receiving (g) == -1)
+  g->sock_events = GUESTFS_HANDLE_READABLE |
+                   GUESTFS_HANDLE_ERROR | GUESTFS_HANDLE_HANGUP;
+  if (sock_update_events (g) == -1)
     goto cleanup3;
 
   g->state = LAUNCHING;
@@ -1335,20 +1344,10 @@ qemu_supports (guestfs_h *g, const char *option)
   return g->qemu_help && strstr (g->qemu_help, option) != NULL;
 }
 
-static void
-finish_wait_ready (guestfs_h *g, void *vp)
-{
-  if (g->verbose)
-    fprintf (stderr, "finish_wait_ready called, %p, vp = %p\n", g, vp);
-
-  *((int *)vp) = 1;
-  g->main_loop->main_loop_quit (g->main_loop, g);
-}
-
 int
 guestfs_wait_ready (guestfs_h *g)
 {
-  int finished = 0, r;
+  guestfs_reply_t reply = {};
 
   if (g->state == READY) return 0;
 
@@ -1362,29 +1361,29 @@ guestfs_wait_ready (guestfs_h *g)
     return -1;
   }
 
-  g->launch_done_cb = finish_wait_ready;
-  g->launch_done_cb_data = &finished;
-  r = g->main_loop->main_loop_run (g->main_loop, g);
-  g->launch_done_cb = NULL;
-  g->launch_done_cb_data = NULL;
-
-  if (r == -1) return -1;
-
-  if (finished != 1) {
-    error (g, _("guestfs_wait_ready failed, see earlier error messages"));
+  guestfs_get_reply (g, &reply, 1);
+  if (0 == reply.len) {
+    error (g, _("guestfs_wait_ready: error receiving reply"));
+    return -1;
+  }
+  if (GUESTFS_LAUNCH_FLAG != reply.len) {
+    error (g, _("guestfs_wait_ready: received non-launch reply"));
     return -1;
   }
+  guestfs_free_reply (g, &reply);
 
   /* This is possible in some really strange situations, such as
    * guestfsd starts up OK but then qemu immediately exits.  Check for
    * it because the caller is probably expecting to be able to send
    * commands after this function returns.
    */
-  if (g->state != READY) {
-    error (g, _("qemu launched and contacted daemon, but state != READY"));
+  if (g->state != LAUNCHING) {
+    error (g, _("qemu launched and contacted daemon, but state != LAUNCHING"));
     return -1;
   }
 
+  g->state = READY;
+
   return 0;
 }
 
@@ -1517,9 +1516,9 @@ guestfs_free_dirent_list (struct guestfs_dirent_list *x)
   free (x);
 }
 
-/* We don't know if stdout_event or sock_read_event will be the
- * first to receive EOF if the qemu process dies.  This function
- * has the common cleanup code for both.
+/* We don't know if stdout_event or sock_read will be the first to receive EOF
+ * if the qemu process dies.  This function has the common cleanup code for
+ * both.
  */
 static void
 child_cleanup (guestfs_h *g)
@@ -1595,175 +1594,218 @@ stdout_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data,
     g->log_message_cb (g, g->log_message_cb_data, buf, n);
 }
 
-/* The function is called whenever we can read something on the
- * guestfsd (daemon inside the guest) communication socket.
- */
-static void
-sock_read_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data,
-		 int watch, int fd, int events)
+void
+guestfs_get_reply (guestfs_h *g, guestfs_reply_t *msg,
+                   unsigned char blocking)
 {
-  XDR xdr;
-  u_int32_t len;
-  int n;
+  msg->len = 0;
 
-  if (g->verbose)
-    fprintf (stderr,
-	     "sock_read_event: %p g->state = %d, fd = %d, events = 0x%x\n",
-	     g, g->state, fd, events);
-
-  if (g->sock != fd) {
-    error (g, _("sock_read_event: internal error: %d != %d"), g->sock, fd);
+  /* Not in the expected state. */
+  if (g->state != BUSY && g->state != LAUNCHING) {
+    error (g, _("state %d != BUSY && g->state != LAUNCHING"), g->state);
     return;
   }
 
-  if (g->msg_in_size <= g->msg_in_allocated) {
-    g->msg_in_allocated += 4096;
-    g->msg_in = safe_realloc (g, g->msg_in, g->msg_in_allocated);
+  /* Execute the main loop until we've received at least 1 complete message */
+  for (;;) {
+    struct guestfs_main_loop *ml = guestfs_get_main_loop(g);
+
+    size_t available = g->msg_in_pos - g->msg_in_consumed;
+
+    if (available >= sizeof (g->msg_in_len)) {
+      xdrmem_create (&msg->xdr, g->msg_in + g->msg_in_consumed,
+                     g->msg_in_pos - g->msg_in_consumed, XDR_DECODE);
+
+      /* Read the message length */
+      if (!xdr_uint32_t (&msg->xdr, &g->msg_in_len)) {
+        error (g, _("can't decode length"));
+        xdr_destroy(&msg->xdr);
+        return;
+      }
+
+      if (g->verbose) {
+        fprintf(stderr, "message length is %u\n", g->msg_in_len);
+      }
+
+      /* Special cases for messages which are represented by magic message
+       * lengths */
+      if (GUESTFS_LAUNCH_FLAG == g->msg_in_len ||
+          GUESTFS_CANCEL_FLAG == g->msg_in_len) {
+        msg->len = g->msg_in_len;
+        g->msg_in_len = sizeof(g->msg_in_len);
+        break;
+      }
+
+      else if (g->msg_in_len > GUESTFS_MESSAGE_MAX) {
+        error (g, _("message length (%u) > maximum possible size (%d)"),
+               g->msg_in_len, GUESTFS_MESSAGE_MAX);
+        /* We're doomed at this point. */
+        abort();
+      }
+
+      /* Quit the loop if we've got a complete message */
+      else if (available >= g->msg_in_len + sizeof (g->msg_in_len)) {
+        /* Total message length includes the length header itself */
+        g->msg_in_len += sizeof (g->msg_in_len);
+        msg->len = g->msg_in_len;
+        break;
+      }
+
+      xdr_destroy (&msg->xdr);
+    }
+
+    if(!blocking) return;
+
+    ml->main_loop_run (ml, g);
   }
-  n = read (g->sock, g->msg_in + g->msg_in_size,
-	    g->msg_in_allocated - g->msg_in_size);
-  if (n == 0) {
-    /* Disconnected. */
-    child_cleanup (g);
-    return;
+}
+
+void
+guestfs_free_reply (guestfs_h *g, guestfs_reply_t *msg)
+{
+  /* Check that the whole message has been consumed */
+  if (g->msg_in_len != xdr_getpos(&msg->xdr)) {
+    error (g, _("guestfs_free_reply: consumed %u bytes of %u length message"),
+           xdr_getpos(&msg->xdr), g->msg_in_len);
   }
 
-  if (n == -1) {
-    if (errno != EINTR && errno != EAGAIN)
-      perrorf (g, "read");
-    return;
+  xdr_destroy (&msg->xdr);
+
+  g->msg_in_consumed += g->msg_in_len;
+
+  if (g->verbose) {
+    fprintf (stderr, "guestfs_free_reply: consumed %u bytes\n",
+             g->msg_in_consumed);
   }
 
-  g->msg_in_size += n;
+  /* If there's nothing left in the buffer, reset to the beginning */
+  if (g->msg_in_consumed == g->msg_in_pos) {
+    g->msg_in_consumed = 0;
+    g->msg_in_pos = 0;
+  }
 
-  /* Have we got enough of a message to be able to process it yet? */
- again:
-  if (g->msg_in_size < 4) return;
+  /* If we got to the end of the buffer, move to the beginning */
+  else if (g->msg_in_size == g->msg_in_pos) {
+    memmove (g->msg_in, g->msg_in + g->msg_in_consumed,
+             g->msg_in_size - g->msg_in_consumed);
+    g->msg_in_pos = g->msg_in_size - g->msg_in_consumed;
+    g->msg_in_consumed = 0;
+  }
+
+  g->msg_in_len = 0;
 
-  xdrmem_create (&xdr, g->msg_in, g->msg_in_size, XDR_DECODE);
-  if (!xdr_uint32_t (&xdr, &len)) {
-    error (g, _("can't decode length word"));
-    goto cleanup;
+  /* If we were ignoring incoming messages, switch them back on */
+  if (!(g->sock_events & GUESTFS_HANDLE_READABLE)) {
+    g->sock_events |= GUESTFS_HANDLE_READABLE;
+    sock_update_events (g);
   }
+}
 
-  /* Length is normally the length of the message, but when guestfsd
-   * starts up it sends a "magic" value (longer than any possible
-   * message).  Check for this.
-   */
-  if (len == GUESTFS_LAUNCH_FLAG) {
-    if (g->state != LAUNCHING)
-      error (g, _("received magic signature from guestfsd, but in state %d"),
-	     g->state);
-    else if (g->msg_in_size != 4)
-      error (g, _("received magic signature from guestfsd, but msg size is %d"),
-	     g->msg_in_size);
-    else {
-      g->state = READY;
-      if (g->launch_done_cb)
-	g->launch_done_cb (g, g->launch_done_cb_data);
-    }
+/* This function is called whenever an event happens on the communications
+ * socket */
+static void
+sock_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data,
+            int watch, int fd, int events)
+{
+  if (g->verbose)
+    fprintf (stderr,
+	     "sock_event: %p g->state = %d, fd = %d, events = 0x%x\n",
+	     g, g->state, fd, events);
 
-    goto cleanup;
+  if (fd != g->sock) {
+    error (g, _("sock_event: received event for non-sock fd %d"), fd);
+    return;
   }
 
-  /* This can happen if a cancellation happens right at the end
-   * of us sending a FileIn parameter to the daemon.  Discard.  The
-   * daemon should send us an error message next.
-   */
-  if (len == GUESTFS_CANCEL_FLAG) {
-    g->msg_in_size -= 4;
-    memmove (g->msg_in, g->msg_in+4, g->msg_in_size);
-    goto again;
+  if (events & GUESTFS_HANDLE_READABLE) {
+    sock_read (g);
   }
 
-  /* If this happens, it's pretty bad and we've probably lost
-   * synchronization.
-   */
-  if (len > GUESTFS_MESSAGE_MAX) {
-    error (g, _("message length (%u) > maximum possible size (%d)"),
-	   len, GUESTFS_MESSAGE_MAX);
-    goto cleanup;
+  if (events & GUESTFS_HANDLE_WRITABLE) {
+    sock_write (g);
   }
 
-  if (g->msg_in_size-4 < len) return; /* Need more of this message. */
+  if (events & (GUESTFS_HANDLE_ERROR | GUESTFS_HANDLE_HANGUP)) {
+    error (g, _("sock_event: received error on communications socket: %m"));
+    child_cleanup (g);
+  }
+}
 
-  /* Got the full message, begin processing it. */
-#if 0
-  if (g->verbose) {
-    int i, j;
-
-    for (i = 0; i < g->msg_in_size; i += 16) {
-      printf ("%04x: ", i);
-      for (j = i; j < MIN (i+16, g->msg_in_size); ++j)
-	printf ("%02x ", (unsigned char) g->msg_in[j]);
-      for (; j < i+16; ++j)
-	printf ("   ");
-      printf ("|");
-      for (j = i; j < MIN (i+16, g->msg_in_size); ++j)
-	if (isprint (g->msg_in[j]))
-	  printf ("%c", g->msg_in[j]);
-	else
-	  printf (".");
-      for (; j < i+16; ++j)
-	printf (" ");
-      printf ("|\n");
+/* This function reads as much data as is available from the guestfsd
+ * communication socket.
+ */
+static void
+sock_read (guestfs_h *g)
+{
+  guestfs_main_loop *ml = guestfs_get_main_loop (g);
+
+  size_t available = g->msg_in_size - g->msg_in_pos;
+
+  if (g->verbose)
+    fprintf (stderr, "sock_read: before g->msg_in_pos = %d "
+                     "available = %d\n", g->msg_in_pos, available);
+
+  while (available > 0) {
+    ssize_t in;
+    while((in = read(g->sock, g->msg_in + g->msg_in_pos, available)) < 0) {
+      /* Retry if interrupted */
+      if(EINTR == errno) continue;
+
+      if(EWOULDBLOCK != errno) {
+        error (g, _("sock_read_data: error reading from socket: %s"),
+             strerror(errno));
+      }
+
+      break;
+    }
+
+    /* Cleanup if we got an EOF */
+    if (0 == in) {
+      child_cleanup(g);
+      return;
+    }
+
+    /* We got data */
+    else if (in > 0) {
+      available -= in;
+      g->msg_in_pos += in;
+    }
+
+    /* There was some error */
+    else {
+      break;
     }
   }
-#endif
 
-  /* Not in the expected state. */
-  if (g->state != BUSY)
-    error (g, _("state %d != BUSY"), g->state);
-
-  /* Push the message up to the higher layer. */
-  if (g->reply_cb)
-    g->reply_cb (g, g->reply_cb_data, &xdr);
-  else
-    /* This message (probably) should never be printed. */
-    fprintf (stderr, "libguesfs: sock_read_event: !!! dropped message !!!\n");
-
-  g->msg_in_size -= len + 4;
-  memmove (g->msg_in, g->msg_in+len+4, g->msg_in_size);
-  if (g->msg_in_size > 0) goto again;
-
- cleanup:
-  /* Free the message buffer if it's grown excessively large. */
-  if (g->msg_in_allocated > 65536) {
-    free (g->msg_in);
-    g->msg_in = NULL;
-    g->msg_in_size = g->msg_in_allocated = 0;
-  } else
-    g->msg_in_size = 0;
+  if (g->verbose)
+    fprintf (stderr, "sock_read: after g->msg_in_pos = %d "
+                     "available = %d\n", g->msg_in_pos, available);
 
-  xdr_destroy (&xdr);
+  /* Ignore further read events if we've filled the buffer */
+  if (0 == available) {
+    g->sock_events &= ~GUESTFS_HANDLE_READABLE;
+    sock_update_events (g);
+  }
+
+  /* Exit the main loop to give the caller a chance to pick up the message */
+  ml->main_loop_quit (ml, g);
 }
 
 /* The function is called whenever we can write something on the
  * guestfsd (daemon inside the guest) communication socket.
  */
 static void
-sock_write_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data,
-		  int watch, int fd, int events)
+sock_write (guestfs_h *g)
 {
   int n, err;
 
-  if (g->verbose)
-    fprintf (stderr,
-	     "sock_write_event: %p g->state = %d, fd = %d, events = 0x%x\n",
-	     g, g->state, fd, events);
-
-  if (g->sock != fd) {
-    error (g, _("sock_write_event: internal error: %d != %d"), g->sock, fd);
-    return;
-  }
-
   if (g->state != BUSY) {
-    error (g, _("sock_write_event: state %d != BUSY"), g->state);
+    error (g, _("sock_write: state %d != BUSY"), g->state);
     return;
   }
 
   if (g->verbose)
-    fprintf (stderr, "sock_write_event: writing %d bytes ...\n",
+    fprintf (stderr, "sock_write: writing %d bytes ...\n",
 	     g->msg_out_size - g->msg_out_pos);
 
   n = write (g->sock, g->msg_out + g->msg_out_pos,
@@ -1778,7 +1820,7 @@ sock_write_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data,
   }
 
   if (g->verbose)
-    fprintf (stderr, "sock_write_event: wrote %d bytes\n", n);
+    fprintf (stderr, "sock_write: wrote %d bytes\n", n);
 
   g->msg_out_pos += n;
 
@@ -1787,7 +1829,7 @@ sock_write_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data,
     return;
 
   if (g->verbose)
-    fprintf (stderr, "sock_write_event: done writing, calling send_cb\n");
+    fprintf (stderr, "sock_write: done writing, calling send_cb\n");
 
   free (g->msg_out);
   g->msg_out = NULL;
@@ -1807,14 +1849,6 @@ guestfs_set_send_callback (guestfs_h *g,
 }
 
 void
-guestfs_set_reply_callback (guestfs_h *g,
-			    guestfs_reply_cb cb, void *opaque)
-{
-  g->reply_cb = cb;
-  g->reply_cb_data = opaque;
-}
-
-void
 guestfs_set_log_message_callback (guestfs_h *g,
 				  guestfs_log_message_cb cb, void *opaque)
 {
@@ -1830,14 +1864,6 @@ guestfs_set_subprocess_quit_callback (guestfs_h *g,
   g->subprocess_quit_cb_data = opaque;
 }
 
-void
-guestfs_set_launch_done_callback (guestfs_h *g,
-				  guestfs_launch_done_cb cb, void *opaque)
-{
-  g->launch_done_cb = cb;
-  g->launch_done_cb_data = opaque;
-}
-
 /* Access to the handle's main loop and the default main loop. */
 void
 guestfs_set_main_loop (guestfs_h *g, guestfs_main_loop *main_loop)
@@ -1857,35 +1883,15 @@ guestfs_get_default_main_loop (void)
   return (guestfs_main_loop *) &default_main_loop;
 }
 
-/* Change the daemon socket handler so that we are now writing.
- * This sets the handle to sock_write_event.
+/* Update the events which will cause the daemon socket callback to be called.
  */
-int
-guestfs__switch_to_sending (guestfs_h *g)
+static int
+sock_update_events (guestfs_h *g)
 {
-  if (g->sock_watch >= 0) {
-    if (g->main_loop->remove_handle (g->main_loop, g, g->sock_watch) == -1) {
-      error (g, _("remove_handle failed"));
-      g->sock_watch = -1;
-      return -1;
-    }
-  }
-
-  g->sock_watch =
-    g->main_loop->add_handle (g->main_loop, g, g->sock,
-			      GUESTFS_HANDLE_WRITABLE,
-			      sock_write_event, NULL);
-  if (g->sock_watch == -1) {
-    error (g, _("add_handle failed"));
-    return -1;
+  if(g->verbose) {
+    fprintf(stderr, "updating socket events to %d\n", g->sock_events);
   }
 
-  return 0;
-}
-
-int
-guestfs__switch_to_receiving (guestfs_h *g)
-{
   if (g->sock_watch >= 0) {
     if (g->main_loop->remove_handle (g->main_loop, g, g->sock_watch) == -1) {
       error (g, _("remove_handle failed"));
@@ -1896,8 +1902,8 @@ guestfs__switch_to_receiving (guestfs_h *g)
 
   g->sock_watch =
     g->main_loop->add_handle (g->main_loop, g, g->sock,
-			      GUESTFS_HANDLE_READABLE,
-			      sock_read_event, NULL);
+			      g->sock_events,
+			      sock_event, NULL);
   if (g->sock_watch == -1) {
     error (g, _("add_handle failed"));
     return -1;
@@ -1989,7 +1995,9 @@ guestfs__send_sync (guestfs_h *g, int proc_nr,
   xdrmem_create (&xdr, g->msg_out, 4, XDR_ENCODE);
   xdr_uint32_t (&xdr, &len);
 
-  if (guestfs__switch_to_sending (g) == -1)
+  /* Respond to writable events */
+  g->sock_events |= GUESTFS_HANDLE_WRITABLE;
+  if (sock_update_events (g) == -1)
     goto cleanup1;
 
   sent = 0;
@@ -2001,6 +2009,13 @@ guestfs__send_sync (guestfs_h *g, int proc_nr,
     goto cleanup1;
   }
 
+  /* No longer interested in writable events */
+  g->sock_events &= ~(GUESTFS_HANDLE_WRITABLE);
+  if (sock_update_events (g) == -1) {
+    error (g, _("guestfs__send_sync: "
+                "failed to remove socket writable callback\n"));
+  }
+
   return serial;
 
  cleanup1:
@@ -2100,8 +2115,6 @@ send_file_complete_sync (guestfs_h *g)
 /* Send a chunk, cancellation or end of file, synchronously (ie. wait
  * for it to go).
  */
-static int check_for_daemon_cancellation (guestfs_h *g);
-
 static int
 send_file_chunk_sync (guestfs_h *g, int cancel, const char *buf, size_t buflen)
 {
@@ -2111,6 +2124,8 @@ send_file_chunk_sync (guestfs_h *g, int cancel, const char *buf, size_t buflen)
   XDR xdr;
   guestfs_main_loop *ml = guestfs_get_main_loop (g);
 
+  guestfs_reply_t cancellation = {};
+
   if (g->state != BUSY) {
     error (g, _("send_file_chunk_sync: state %d != READY"), g->state);
     return -1;
@@ -2125,11 +2140,15 @@ send_file_chunk_sync (guestfs_h *g, int cancel, const char *buf, size_t buflen)
   }
 
   /* Did the daemon send a cancellation message? */
-  if (check_for_daemon_cancellation (g)) {
+  guestfs_get_reply (g, &cancellation, 0);
+  if (GUESTFS_CANCEL_FLAG == cancellation.len) {
     if (g->verbose)
       fprintf (stderr, "got daemon cancellation\n");
     return -2;
   }
+  if (0 != cancellation.len) {
+    guestfs_free_reply (g, &cancellation);
+  }
 
   /* Allocate the chunk buffer.  Don't use the stack to avoid
    * excessive stack usage and unnecessary copies.
@@ -2160,7 +2179,9 @@ send_file_chunk_sync (guestfs_h *g, int cancel, const char *buf, size_t buflen)
   xdrmem_create (&xdr, g->msg_out, 4, XDR_ENCODE);
   xdr_uint32_t (&xdr, &len);
 
-  if (guestfs__switch_to_sending (g) == -1)
+  /* Respond to socket writable events */
+  g->sock_events |= GUESTFS_HANDLE_WRITABLE;
+  if (sock_update_events (g) == -1)
     goto cleanup1;
 
   sent = 0;
@@ -2172,6 +2193,12 @@ send_file_chunk_sync (guestfs_h *g, int cancel, const char *buf, size_t buflen)
     goto cleanup1;
   }
 
+  g->sock_events &= ~(GUESTFS_HANDLE_WRITABLE);
+  if (sock_update_events (g) == -1) {
+    error (g, _("send_file_chunk_sync: "
+                "failed to remove socket writable callback\n"));
+  }
+
   return 0;
 
  cleanup1:
@@ -2181,52 +2208,6 @@ send_file_chunk_sync (guestfs_h *g, int cancel, const char *buf, size_t buflen)
   return -1;
 }
 
-/* At this point we are sending FileIn file(s) to the guest, and not
- * expecting to read anything, so if we do read anything, it must be
- * a cancellation message.  This checks for this case without blocking.
- */
-static int
-check_for_daemon_cancellation (guestfs_h *g)
-{
-  fd_set rset;
-  struct timeval tv;
-  int r;
-  char buf[4];
-  uint32_t flag;
-  XDR xdr;
-
-  FD_ZERO (&rset);
-  FD_SET (g->sock, &rset);
-  tv.tv_sec = 0;
-  tv.tv_usec = 0;
-  r = select (g->sock+1, &rset, NULL, NULL, &tv);
-  if (r == -1) {
-    perrorf (g, "select");
-    return 0;
-  }
-  if (r == 0)
-    return 0;
-
-  /* Read the message from the daemon. */
-  r = xread (g->sock, buf, sizeof buf);
-  if (r == -1) {
-    perrorf (g, "read");
-    return 0;
-  }
-
-  xdrmem_create (&xdr, buf, sizeof buf, XDR_DECODE);
-  xdr_uint32_t (&xdr, &flag);
-  xdr_destroy (&xdr);
-
-  if (flag != GUESTFS_CANCEL_FLAG) {
-    error (g, _("check_for_daemon_cancellation: read 0x%x from daemon, expected 0x%x\n"),
-	   flag, GUESTFS_CANCEL_FLAG);
-    return 0;
-  }
-
-  return 1;
-}
-
 /* Synchronously receive a file. */
 
 /* Returns -1 = error, 0 = EOF, 1 = more data */
@@ -2238,6 +2219,7 @@ guestfs__receive_file_sync (guestfs_h *g, const char *filename)
   void *buf;
   int fd, r;
   size_t len;
+  size_t total_bytes = 0, total_receives = 0;
 
   fd = open (filename, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0666);
   if (fd == -1) {
@@ -2247,6 +2229,11 @@ guestfs__receive_file_sync (guestfs_h *g, const char *filename)
 
   /* Receive the file in chunked encoding. */
   while ((r = receive_file_data_sync (g, &buf, &len)) >= 0) {
+    if (g->verbose) {
+      fprintf(stderr, "guestfs__receive_file_sync: "
+                      "writing %zi bytes to %s\n", len, filename);
+    }
+
     if (xwrite (fd, buf, len) == -1) {
       perrorf (g, "%s: write", filename);
       free (buf);
@@ -2254,6 +2241,9 @@ guestfs__receive_file_sync (guestfs_h *g, const char *filename)
     }
     free (buf);
     if (r == 0) break; /* End of file. */
+
+    total_bytes += len;
+    total_receives++;
   }
 
   if (r == -1) {
@@ -2266,6 +2256,12 @@ guestfs__receive_file_sync (guestfs_h *g, const char *filename)
     return -1;
   }
 
+  if (g->verbose) {
+    fprintf(stderr, "guestfs__receive_file_sync: "
+                    "wrote %zi bytes in %zi calls to %s\n",
+                    total_bytes, total_receives, filename);
+  }
+
   return 0;
 
  cancel: ;
@@ -2291,117 +2287,75 @@ guestfs__receive_file_sync (guestfs_h *g, const char *filename)
   return -1;
 }
 
-/* Note that the reply callback can be called multiple times before
- * the main loop quits and we get back to the synchronous code.  So
- * we have to be prepared to save multiple chunks on a list here.
- */
-struct receive_file_ctx {
-  int count;			/* 0 if receive_file_cb not called, or
-				 * else count number of chunks.
-				 */
-  guestfs_chunk *chunks;	/* Array of chunks. */
-};
-
-static void
-free_chunks (struct receive_file_ctx *ctx)
-{
-  int i;
-
-  for (i = 0; i < ctx->count; ++i)
-    free (ctx->chunks[i].data.data_val);
-
-  free (ctx->chunks);
-}
-
-static void
-receive_file_cb (guestfs_h *g, void *data, XDR *xdr)
-{
-  guestfs_main_loop *ml = guestfs_get_main_loop (g);
-  struct receive_file_ctx *ctx = (struct receive_file_ctx *) data;
-  guestfs_chunk chunk;
-
-  if (ctx->count == -1)		/* Parse error occurred previously. */
-    return;
-
-  ml->main_loop_quit (ml, g);
-
-  memset (&chunk, 0, sizeof chunk);
-
-  if (!xdr_guestfs_chunk (xdr, &chunk)) {
-    error (g, _("failed to parse file chunk"));
-    free_chunks (ctx);
-    ctx->chunks = NULL;
-    ctx->count = -1;
-    return;
-  }
-
-  /* Copy the chunk to the list. */
-  ctx->chunks = safe_realloc (g, ctx->chunks,
-			      sizeof (guestfs_chunk) * (ctx->count+1));
-  ctx->chunks[ctx->count] = chunk;
-  ctx->count++;
-}
-
 /* Receive a chunk of file data. */
 /* Returns -1 = error, 0 = EOF, 1 = more data */
 static int
-receive_file_data_sync (guestfs_h *g, void **buf, size_t *len_r)
+receive_file_data_sync (guestfs_h *g, void **buf, size_t *len)
 {
-  struct receive_file_ctx ctx;
-  guestfs_main_loop *ml = guestfs_get_main_loop (g);
-  int i;
-  size_t len;
+  unsigned char block = 1;
 
-  ctx.count = 0;
-  ctx.chunks = NULL;
+  /* Accumulate data in this buffer. */
+  if (buf) *buf = NULL;
+  if (len) *len = 0;
 
-  guestfs_set_reply_callback (g, receive_file_cb, &ctx);
-  (void) ml->main_loop_run (ml, g);
-  guestfs_set_reply_callback (g, NULL, NULL);
+  /* Buffer chunks until one of:
+   * There are no more immediately available
+   * The buffer got too big
+   * We got the last chunk
+   */
+  for(;;) {
+    guestfs_reply_t reply;
+    guestfs_chunk chunk;
+
+    /* Get a reply message if there's one available */
+    guestfs_get_reply (g, &reply, block);
+
+    /* No more replies available right now */
+    if (0 == reply.len) {
+      /* We should have waited until we got at least 1 */
+      if (NULL == *buf) {
+        error (g, _("receive_file_data: error receiving chunk"));
+        guestfs_free_reply (g, &reply);
+        return -1;
+      }
 
-  if (ctx.count == 0) {
-    error (g, _("receive_file_data_sync: reply callback not called\n"));
-    return -1;
-  }
+      break;
+    }
 
-  if (ctx.count == -1) {
-    error (g, _("receive_file_data_sync: parse error in reply callback\n"));
-    /* callback already freed the chunks */
-    return -1;
-  }
+    if (g->verbose)
+      fprintf (stderr, "receive_file_data_sync: got chunk\n");
+    
+    memset (&chunk, 0, sizeof (chunk));
 
-  if (g->verbose)
-    fprintf (stderr, "receive_file_data_sync: got %d chunks\n", ctx.count);
-
-  /* Process each chunk in the list. */
-  if (buf) *buf = NULL;		/* Accumulate data in this buffer. */
-  len = 0;
-
-  for (i = 0; i < ctx.count; ++i) {
-    if (ctx.chunks[i].cancel) {
-      error (g, _("file receive cancelled by daemon"));
-      free_chunks (&ctx);
-      if (buf) free (*buf);
-      if (len_r) *len_r = 0;
+    if (!xdr_guestfs_chunk (&reply.xdr, &chunk)) {
+      error (g, _("receive_file_data_sync: failed to parse file chunk"));
+      guestfs_free_reply (g, &reply);
       return -1;
     }
 
-    if (ctx.chunks[i].data.data_len == 0) { /* end of transfer */
-      free_chunks (&ctx);
-      if (len_r) *len_r = len;
+    /* Copy the chunk into the output buffer */
+    if (len && buf) {
+      *buf = safe_realloc (g, *buf, *len + chunk.data.data_len);
+      memcpy (*buf + *len, chunk.data.data_val, chunk.data.data_len);
+      *len += chunk.data.data_len;
+    }
+
+    guestfs_free_reply (g, &reply);
+
+    /* Received the last chunk */
+    if (0 == chunk.data.data_len) {
       return 0;
     }
 
-    if (buf) {
-      *buf = safe_realloc (g, *buf, len + ctx.chunks[i].data.data_len);
-      memcpy (*buf+len, ctx.chunks[i].data.data_val,
-	      ctx.chunks[i].data.data_len);
+    /* Return early if the the output buffer length is bigger than some
+     * arbitrary size. Note that the output buffer may exceed this size. */
+    if (len && *len > 1024 * 1024) {
+      break;
     }
-    len += ctx.chunks[i].data.data_len;
+
+    block = 0;
   }
 
-  if (len_r) *len_r = len;
-  free_chunks (&ctx);
   return 1;
 }
 
diff --git a/src/guestfs.h b/src/guestfs.h
index 264986f..4f19327 100644
--- a/src/guestfs.h
+++ b/src/guestfs.h
@@ -64,13 +64,10 @@ typedef void (*guestfs_send_cb) (guestfs_h *g, void *data);
 typedef void (*guestfs_reply_cb) (guestfs_h *g, void *data, XDR *xdr);
 typedef void (*guestfs_log_message_cb) (guestfs_h *g, void *data, char *buf, int len);
 typedef void (*guestfs_subprocess_quit_cb) (guestfs_h *g, void *data);
-typedef void (*guestfs_launch_done_cb) (guestfs_h *g, void *data);
 
 extern void guestfs_set_send_callback (guestfs_h *g, guestfs_send_cb cb, void *opaque);
-extern void guestfs_set_reply_callback (guestfs_h *g, guestfs_reply_cb cb, void *opaque);
 extern void guestfs_set_log_message_callback (guestfs_h *g, guestfs_log_message_cb cb, void *opaque);
 extern void guestfs_set_subprocess_quit_callback (guestfs_h *g, guestfs_subprocess_quit_cb cb, void *opaque);
-extern void guestfs_set_launch_done_callback (guestfs_h *g, guestfs_launch_done_cb cb, void *opaque);
 
 extern void guestfs_error (guestfs_h *g, const char *fs, ...)
   __attribute__((format (printf,2,3)));
@@ -82,8 +79,14 @@ extern void *guestfs_safe_realloc (guestfs_h *g, void *ptr, int nbytes);
 extern char *guestfs_safe_strdup (guestfs_h *g, const char *str);
 extern void *guestfs_safe_memdup (guestfs_h *g, void *ptr, size_t size);
 
-extern int guestfs__switch_to_sending (guestfs_h *g);
-extern int guestfs__switch_to_receiving (guestfs_h *g);
+typedef struct {
+    u_int32_t len;
+    XDR xdr;
+} guestfs_reply_t;
+
+extern void guestfs_get_reply (guestfs_h *g, guestfs_reply_t *msg,
+                               unsigned char blocking);
+extern void guestfs_free_reply (guestfs_h *g, guestfs_reply_t *msg);
 
 /* These *_sync calls wait until the action is performed, using the
  * main loop.  We should implement asynchronous versions too.
-- 
1.6.2.5


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]