[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH] virCommand: Don't misuse the eventloop for async IO



Currently, if a command wants to do asynchronous IO, a callback
is registered in the libvirtd eventloop to handle writes and
reads. However, there's a race in virCommandWait. The eventloop
may already be executing the callback, while virCommandWait is
mangling internal state of virCommand. To deal with it, we need
to either introduce locking or spawn a separate thread where we
poll() on stdio from child. The former, however, requires to
unlock all mutexes held, as the event loop may execute other
callbacks which tries to lock one of the mutexes, deadlock and
thus never wake us up. So it's safer to spawn a separate thread.
---

This is an alternative to:

https://www.redhat.com/archives/libvir-list/2013-February/msg00352.html

 src/util/vircommand.c | 342 +++++++++++---------------------------------------
 1 file changed, 76 insertions(+), 266 deletions(-)

diff --git a/src/util/vircommand.c b/src/util/vircommand.c
index db7dbca..160f4c2 100644
--- a/src/util/vircommand.c
+++ b/src/util/vircommand.c
@@ -42,6 +42,7 @@
 #include "virpidfile.h"
 #include "virprocess.h"
 #include "virbuffer.h"
+#include "virthread.h"
 
 #define VIR_FROM_THIS VIR_FROM_NONE
 
@@ -80,15 +81,13 @@ struct _virCommand {
     char **errbuf;
 
     int infd;
+    int inpipe;
     int outfd;
     int errfd;
     int *outfdptr;
     int *errfdptr;
 
-    size_t inbufOffset;
-    int inWatch;
-    int outWatch;
-    int errWatch;
+    virThreadPtr asyncioThread;
 
     bool handshake;
     int handshakeWait[2];
@@ -784,8 +783,7 @@ virCommandNewArgs(const char *const*args)
     cmd->handshakeNotify[0] = -1;
     cmd->handshakeNotify[1] = -1;
 
-    cmd->infd = cmd->outfd = cmd->errfd = -1;
-    cmd->inWatch = cmd->outWatch = cmd->errWatch = -1;
+    cmd->infd = cmd->inpipe = cmd->outfd = cmd->errfd = -1;
     cmd->pid = -1;
 
     virCommandAddArgSet(cmd, args);
@@ -1703,19 +1701,17 @@ virCommandToString(virCommandPtr cmd)
  * Manage input and output to the child process.
  */
 static int
-virCommandProcessIO(virCommandPtr cmd, int *inpipe)
+virCommandProcessIO(virCommandPtr cmd)
 {
-    int infd = -1, outfd = -1, errfd = -1;
+    int outfd = -1, errfd = -1;
     size_t inlen = 0, outlen = 0, errlen = 0;
     size_t inoff = 0;
     int ret = 0;
 
     /* With an input buffer, feed data to child
      * via pipe */
-    if (cmd->inbuf) {
+    if (cmd->inbuf)
         inlen = strlen(cmd->inbuf);
-        infd = *inpipe;
-    }
 
     /* With out/err buffer, the outfd/errfd have been filled with an
      * FD for us.  Guarantee an allocated string with partial results
@@ -1744,8 +1740,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe)
         struct pollfd fds[3];
         int nfds = 0;
 
-        if (infd != -1) {
-            fds[nfds].fd = infd;
+        if (cmd->inpipe != -1) {
+            fds[nfds].fd = cmd->inpipe;
             fds[nfds].events = POLLOUT;
             fds[nfds].revents = 0;
             nfds++;
@@ -1817,21 +1813,19 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe)
             }
 
             if (fds[i].revents & (POLLOUT | POLLERR) &&
-                fds[i].fd == infd) {
+                fds[i].fd == cmd->inpipe) {
                 int done;
 
                 /* Coverity 5.3.0 can't see that we only get here if
                  * infd is in the set because it was non-negative.  */
                 sa_assert(infd != -1);
-                done = write(infd, cmd->inbuf + inoff,
+                done = write(cmd->inpipe, cmd->inbuf + inoff,
                              inlen - inoff);
                 if (done < 0) {
                     if (errno == EPIPE) {
                         VIR_DEBUG("child closed stdin early, ignoring EPIPE "
-                                  "on fd %d", infd);
-                        if (VIR_CLOSE(*inpipe) < 0)
-                            VIR_DEBUG("ignoring failed close on fd %d", infd);
-                        infd = -1;
+                                  "on fd %d", cmd->inpipe);
+                        VIR_FORCE_CLOSE(cmd->inpipe);
                     } else if (errno != EINTR && errno != EAGAIN) {
                         virReportSystemError(errno, "%s",
                                              _("unable to write to child input"));
@@ -1839,11 +1833,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe)
                     }
                 } else {
                     inoff += done;
-                    if (inoff == inlen) {
-                        if (VIR_CLOSE(*inpipe) < 0)
-                            VIR_DEBUG("ignoring failed close on fd %d", infd);
-                        infd = -1;
-                    }
+                    if (inoff == inlen)
+                        VIR_FORCE_CLOSE(cmd->inpipe);
                 }
             }
         }
@@ -1914,7 +1905,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
     int ret = 0;
     char *outbuf = NULL;
     char *errbuf = NULL;
-    int infd[2] = { -1, -1 };
     struct stat st;
     bool string_io;
     bool async_io = false;
@@ -1960,18 +1950,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
         }
     }
 
-    /* If we have an input buffer, we need
-     * a pipe to feed the data to the child */
-    if (cmd->inbuf) {
-        if (pipe2(infd, O_CLOEXEC) < 0) {
-            virReportSystemError(errno, "%s",
-                                 _("unable to open pipe"));
-            cmd->has_error = -1;
-            return -1;
-        }
-        cmd->infd = infd[0];
-    }
-
     /* If caller requested the same string for stdout and stderr, then
      * merge those into one string.  */
     if (cmd->outbuf && cmd->outbuf == cmd->errbuf) {
@@ -1999,23 +1977,14 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
 
     cmd->flags |= VIR_EXEC_RUN_SYNC;
     if (virCommandRunAsync(cmd, NULL) < 0) {
-        if (cmd->inbuf) {
-            tmpfd = infd[0];
-            if (VIR_CLOSE(infd[0]) < 0)
-                VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
-            tmpfd = infd[1];
-            if (VIR_CLOSE(infd[1]) < 0)
-                VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
-        }
         cmd->has_error = -1;
         return -1;
     }
 
-    tmpfd = infd[0];
-    if (VIR_CLOSE(infd[0]) < 0)
-        VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
-    if (string_io)
-        ret = virCommandProcessIO(cmd, &infd[1]);
+    if (string_io) {
+        VIR_FORCE_CLOSE(cmd->infd);
+        ret = virCommandProcessIO(cmd);
+    }
 
     if (virCommandWait(cmd, exitstatus) < 0)
         ret = -1;
@@ -2031,11 +2000,7 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
 
     /* Reset any capturing, in case caller runs
      * this identical command again */
-    if (cmd->inbuf) {
-        tmpfd = infd[1];
-        if (VIR_CLOSE(infd[1]) < 0)
-            VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
-    }
+    VIR_FORCE_CLOSE(cmd->inpipe);
     if (cmd->outbuf == &outbuf) {
         tmpfd = cmd->outfd;
         if (VIR_CLOSE(cmd->outfd) < 0)
@@ -2135,177 +2100,13 @@ virCommandHook(void *data)
 
 
 static void
-virCommandHandleReadWrite(int watch, int fd, int events, void *opaque)
-{
-    virCommandPtr cmd = (virCommandPtr) opaque;
-    char ***bufptr = NULL;
-    char buf[1024];
-    ssize_t nread, nwritten;
-    size_t len = 0;
-    int *watchPtr = NULL;
-    bool eof = false;
-    int *fdptr = NULL, **fdptrptr = NULL;
-
-    VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events);
-    errno = 0;
-
-    if (watch == cmd->inWatch) {
-        watchPtr = &cmd->inWatch;
-        fdptr  = &cmd->infd;
-
-        if (events & VIR_EVENT_HANDLE_WRITABLE) {
-            len = strlen(cmd->inbuf);
-
-            while (true) {
-                nwritten = write(fd, cmd->inbuf + cmd->inbufOffset,
-                                 len - cmd->inbufOffset);
-                if (nwritten < 0) {
-                    if (errno != EAGAIN && errno != EINTR) {
-                        virReportSystemError(errno,
-                                             _("Unable to write command's "
-                                               "input to FD %d"),
-                                             fd);
-                        eof = true;
-                    }
-                    break;
-                }
-
-                if (nwritten == 0) {
-                    eof = true;
-                    break;
-                }
-
-                cmd->inbufOffset += nwritten;
-                if (cmd->inbufOffset == len) {
-                    VIR_FORCE_CLOSE(cmd->infd);
-                    eof = true;
-                    break;
-                }
-            }
-
-        }
-    } else {
-        if (watch == cmd->outWatch) {
-            watchPtr = &cmd->outWatch;
-            bufptr = &cmd->outbuf;
-            fdptr = &cmd->outfd;
-            fdptrptr = &cmd->outfdptr;
-        } else {
-            watchPtr = &cmd->errWatch;
-            bufptr = &cmd->errbuf;
-            fdptr = &cmd->errfd;
-            fdptrptr = &cmd->errfdptr;
-        }
-
-        if (events & VIR_EVENT_HANDLE_READABLE) {
-            if (**bufptr)
-                len = strlen(**bufptr);
-
-            while (true) {
-                nread = read(fd, buf, sizeof(buf));
-                if (nread < 0) {
-                    if (errno != EAGAIN && errno != EINTR) {
-                        virReportSystemError(errno,
-                                             _("unable to read command's "
-                                               "output from FD %d"),
-                                             fd);
-                        eof = true;
-                    }
-                    break;
-                }
-
-                if (nread == 0) {
-                    eof = true;
-                    break;
-                }
-
-                if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) {
-                    virReportOOMError();
-                    break;
-                }
-
-                memcpy(**bufptr + len, buf, nread);
-                (**bufptr)[len + nread] = '\0';
-            }
-
-        }
-    }
-
-    if (eof || (events & VIR_EVENT_HANDLE_HANGUP) ||
-        (events & VIR_EVENT_HANDLE_ERROR)) {
-        virEventRemoveHandle(watch);
-
-        *watchPtr = -1;
-        VIR_FORCE_CLOSE(*fdptr);
-        if (bufptr)
-            *bufptr = NULL;
-        if (fdptrptr)
-            *fdptrptr = NULL;
-    }
-}
-
-
-static int
-virCommandRegisterEventLoop(virCommandPtr cmd)
+virCommandDoAsyncIOHelper(void *opaque)
 {
-    int ret = -1;
-
-    if (cmd->inbuf &&
-        (cmd->inWatch = virEventAddHandle(cmd->infd,
-                                          VIR_EVENT_HANDLE_WRITABLE |
-                                          VIR_EVENT_HANDLE_HANGUP |
-                                          VIR_EVENT_HANDLE_ERROR,
-                                          virCommandHandleReadWrite,
-                                          cmd, NULL)) < 0) {
-        virReportError(VIR_ERR_INTERNAL_ERROR,
-                       _("Unable to register infd %d in the event loop"),
-                       cmd->infd);
-        goto cleanup;
+    virCommandPtr cmd = opaque;
+    if (virCommandProcessIO(cmd) < 0) {
+        /* If something went wrong, save errno or -1*/
+        cmd->has_error = errno ? errno : -1;
     }
-
-    if (cmd->outbuf && cmd->outfdptr == &cmd->outfd &&
-        (cmd->outWatch = virEventAddHandle(cmd->outfd,
-                                           VIR_EVENT_HANDLE_READABLE |
-                                           VIR_EVENT_HANDLE_HANGUP |
-                                           VIR_EVENT_HANDLE_ERROR,
-                                           virCommandHandleReadWrite,
-                                           cmd, NULL)) < 0) {
-        virReportError(VIR_ERR_INTERNAL_ERROR,
-                       _("Unable to register outfd %d in the event loop"),
-                       cmd->outfd);
-
-        if (cmd->inWatch != -1) {
-            virEventRemoveHandle(cmd->inWatch);
-            cmd->inWatch = -1;
-        }
-        goto cleanup;
-    }
-
-    if (cmd->errbuf && cmd->errfdptr == &cmd->errfd &&
-        (cmd->errWatch = virEventAddHandle(cmd->errfd,
-                                           VIR_EVENT_HANDLE_READABLE |
-                                           VIR_EVENT_HANDLE_HANGUP |
-                                           VIR_EVENT_HANDLE_ERROR,
-                                           virCommandHandleReadWrite,
-                                           cmd, NULL)) < 0) {
-        virReportError(VIR_ERR_INTERNAL_ERROR,
-                       _("Unable to register errfd %d in the event loop"),
-                       cmd->errfd);
-        if (cmd->inWatch != -1) {
-            virEventRemoveHandle(cmd->inWatch);
-            cmd->inWatch = -1;
-        }
-        if (cmd->outWatch != -1) {
-            virEventRemoveHandle(cmd->outWatch);
-            cmd->outWatch = -1;
-        }
-        goto cleanup;
-    }
-
-    ret = 0;
-
-cleanup:
-    return ret;
 }
 
 
@@ -2351,23 +2152,21 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
     synchronous = cmd->flags & VIR_EXEC_RUN_SYNC;
     cmd->flags &= ~VIR_EXEC_RUN_SYNC;
 
-    /* Buffer management can only be requested via virCommandRun, unless help
-     * from the event loop has been requested via virCommandDoAsyncIO. */
-    if (cmd->flags & VIR_EXEC_ASYNC_IO) {
-        /* If we have an input buffer, we need
-         * a pipe to feed the data to the child */
-        if (cmd->inbuf && cmd->infd == -1) {
-            if (pipe2(infd, O_CLOEXEC) < 0) {
-                virReportSystemError(errno, "%s",
-                                     _("unable to open pipe"));
-                cmd->has_error = -1;
-                return -1;
-            }
-            cmd->infd = infd[0];
+    /* Buffer management can only be requested via virCommandRun or
+     * virCommandDoAsyncIO. */
+    if (cmd->inbuf && cmd->infd == -1 &&
+        (synchronous || cmd->flags & VIR_EXEC_ASYNC_IO)) {
+        if (pipe2(infd, O_CLOEXEC) < 0) {
+            virReportSystemError(errno, "%s",
+                                 _("unable to open pipe"));
+            cmd->has_error = -1;
+            return -1;
         }
+        cmd->infd = infd[0];
+        cmd->inpipe = infd[1];
     } else if ((cmd->inbuf && cmd->infd == -1) ||
-         (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
-         (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
+               (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
+               (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                        _("cannot mix string I/O with asynchronous command"));
         return -1;
@@ -2377,24 +2176,24 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
         virReportError(VIR_ERR_INTERNAL_ERROR,
                        _("command is already running as pid %lld"),
                        (long long) cmd->pid);
-        return -1;
+        goto cleanup;
     }
 
     if (!synchronous && (cmd->flags & VIR_EXEC_DAEMON)) {
         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                        _("daemonized command cannot use virCommandRunAsync"));
-        return -1;
+        goto cleanup;
     }
     if (cmd->pwd && (cmd->flags & VIR_EXEC_DAEMON)) {
         virReportError(VIR_ERR_INTERNAL_ERROR,
                        _("daemonized command cannot set working directory %s"),
                        cmd->pwd);
-        return -1;
+        goto cleanup;
     }
     if (cmd->pidfile && !(cmd->flags & VIR_EXEC_DAEMON)) {
         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                        _("creation of pid file requires daemonized command"));
-        return -1;
+        goto cleanup;
     }
 
     str = virCommandToString(cmd);
@@ -2430,15 +2229,27 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
         cmd->reap = true;
 
     if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) {
-        cmd->flags &= ~VIR_EXEC_ASYNC_IO;
-        if (cmd->inbuf && cmd->infd != -1) {
-            /* close the read end of infd and replace it with the write end */
+        if (cmd->inbuf)
             VIR_FORCE_CLOSE(cmd->infd);
-            cmd->infd = infd[1];
+        /* clear any error so we can catch if the helper thread reports one */
+        cmd->has_error = 0;
+        if (VIR_ALLOC(cmd->asyncioThread) < 0 ||
+            virThreadCreate(cmd->asyncioThread, true,
+                            virCommandDoAsyncIOHelper, cmd) < 0) {
+            virReportSystemError(errno, "%s",
+                                 _("Unable to create thread "
+                                   "to process command's IO"));
+            VIR_FREE(cmd->asyncioThread);
+            virCommandAbort(cmd);
+            ret = -1;
         }
-        ret = virCommandRegisterEventLoop(cmd);
     }
 
+cleanup:
+    if (ret < 0) {
+        VIR_FORCE_CLOSE(cmd->infd);
+        VIR_FORCE_CLOSE(cmd->inpipe);
+    }
     return ret;
 }
 
@@ -2459,7 +2270,6 @@ virCommandWait(virCommandPtr cmd, int *exitstatus)
 {
     int ret;
     int status = 0;
-    const int events = VIR_EVENT_HANDLE_READABLE | VIR_EVENT_HANDLE_HANGUP;
 
     if (!cmd ||cmd->has_error == ENOMEM) {
         virReportOOMError();
@@ -2484,24 +2294,20 @@ virCommandWait(virCommandPtr cmd, int *exitstatus)
      * guarantee that virProcessWait only fails due to failure to wait,
      * and repeat the exitstatus check code ourselves.  */
     ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status);
-
-    if (cmd->inWatch != -1) {
-        virEventRemoveHandle(cmd->inWatch);
-        cmd->inWatch = -1;
-    }
-
-    if (cmd->outWatch != -1) {
-        virEventRemoveHandle(cmd->outWatch);
-        virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd);
-        cmd->outWatch = -1;
-    }
-
-    if (cmd->errWatch != -1) {
-        virEventRemoveHandle(cmd->errWatch);
-        virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd);
-        cmd->errWatch = -1;
+    if (cmd->flags & VIR_EXEC_ASYNC_IO) {
+        cmd->flags &= ~VIR_EXEC_ASYNC_IO;
+        virThreadJoin(cmd->asyncioThread);
+        VIR_FREE(cmd->asyncioThread);
+        VIR_FORCE_CLOSE(cmd->inpipe);
+        if (cmd->has_error) {
+            const char *msg = _("Error while processing command's IO");
+            if (cmd->has_error < 0)
+                virReportError(VIR_ERR_INTERNAL_ERROR, "%s", msg);
+            else
+                virReportSystemError(cmd->has_error, "%s", msg);
+            ret = -1;
+        }
     }
-
     if (ret == 0) {
         cmd->pid = -1;
         cmd->reap = false;
@@ -2719,6 +2525,10 @@ virCommandFree(virCommandPtr cmd)
         VIR_FORCE_CLOSE(cmd->transfer[i]);
     }
 
+    if (cmd->asyncioThread) {
+        virThreadJoin(cmd->asyncioThread);
+        VIR_FREE(cmd->asyncioThread);
+    }
     VIR_FREE(cmd->inbuf);
     VIR_FORCE_CLOSE(cmd->outfd);
     VIR_FORCE_CLOSE(cmd->errfd);
-- 
1.8.0.2


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]