[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH v5 07/13] Add support for async close of client RPC socket



---
Notes:
    Version 5:
    - rebased on top of DanB's non-blocking patches; this is the only part that
      required non-trivial rebase so I'm posting it for additional review
    
    Version 4:
    - no changes
    
    Version 3:
    - no changes
    
    Version 2:
    - no changes

 src/rpc/virnetclient.c |   99 +++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 025d270..b4b2fe7 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -101,9 +101,13 @@ struct _virNetClient {
 
     size_t nstreams;
     virNetClientStreamPtr *streams;
+
+    bool wantClose;
 };
 
 
+void virNetClientRequestClose(virNetClientPtr client);
+
 static void virNetClientLock(virNetClientPtr client)
 {
     virMutexLock(&client->lock);
@@ -409,12 +413,14 @@ void virNetClientFree(virNetClientPtr client)
 }
 
 
-void virNetClientClose(virNetClientPtr client)
+static void
+virNetClientCloseLocked(virNetClientPtr client)
 {
-    if (!client)
+    VIR_DEBUG("client=%p, sock=%p", client, client->sock);
+
+    if (!client->sock)
         return;
 
-    virNetClientLock(client);
     virNetSocketRemoveIOCallback(client->sock);
     virNetSocketFree(client->sock);
     client->sock = NULL;
@@ -424,6 +430,41 @@ void virNetClientClose(virNetClientPtr client)
     virNetSASLSessionFree(client->sasl);
     client->sasl = NULL;
 #endif
+    client->wantClose = false;
+}
+
+void virNetClientClose(virNetClientPtr client)
+{
+    if (!client)
+        return;
+
+    virNetClientLock(client);
+    virNetClientCloseLocked(client);
+    virNetClientUnlock(client);
+}
+
+void
+virNetClientRequestClose(virNetClientPtr client)
+{
+    VIR_DEBUG("client=%p", client);
+
+    virNetClientLock(client);
+
+    /* If there is a thread polling for data on the socket, set wantClose flag
+     * and wake the thread up or just immediately close the socket when no-one
+     * is polling on it.
+     */
+    if (client->waitDispatch) {
+        char ignore = 1;
+        int len = sizeof(ignore);
+
+        client->wantClose = true;
+        if (safewrite(client->wakeupSendFD, &ignore, len) != len)
+            VIR_ERROR(_("failed to wake up polling thread"));
+    } else {
+        virNetClientCloseLocked(client);
+    }
+
     virNetClientUnlock(client);
 }
 
@@ -1096,6 +1137,26 @@ static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
 }
 
 
+static void
+virNetClientIOEventLoopRemoveAll(virNetClientPtr client,
+                                 virNetClientCallPtr thiscall)
+{
+    if (!client->waitDispatch)
+        return;
+
+    if (client->waitDispatch == thiscall) {
+        /* just pretend nothing was sent and the caller will free the call */
+        thiscall->sentSomeData = false;
+    } else {
+        virNetClientCallPtr call = client->waitDispatch;
+        virNetClientCallRemove(&client->waitDispatch, call);
+        ignore_value(virCondDestroy(&call->cond));
+        VIR_FREE(call->msg);
+        VIR_FREE(call);
+    }
+}
+
+
 static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
 {
     VIR_DEBUG("Giving up the buck %p", thiscall);
@@ -1110,7 +1171,12 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
         }
         tmp = tmp->next;
     }
+
     VIR_DEBUG("No thread to pass the buck to");
+    if (client->wantClose) {
+        virNetClientCloseLocked(client);
+        virNetClientIOEventLoopRemoveAll(client, thiscall);
+    }
 }
 
 
@@ -1141,11 +1207,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         sigset_t oldmask, blockedsigs;
         int timeout = -1;
 
-        /* If we have existing SASL decoded data we
-         * don't want to sleep in the poll(), just
-         * check if any other FDs are also ready
+        /* If we have existing SASL decoded data we don't want to sleep in
+         * the poll(), just check if any other FDs are also ready.
+         * If the connection is going to be closed, we don't want to sleep in
+         * poll() either.
          */
-        if (virNetSocketHasCachedData(client->sock))
+        if (virNetSocketHasCachedData(client->sock) || client->wantClose)
             timeout = 0;
 
         /* If there are any non-blocking calls in the queue,
@@ -1208,6 +1275,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
             fds[0].revents |= POLLIN;
         }
 
+        /* If wantClose flag is set, pretend there was an error on the socket
+         */
+        if (client->wantClose)
+            fds[0].revents = POLLERR;
+
         if (fds[1].revents) {
             VIR_DEBUG("Woken up from poll by other thread");
             if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
@@ -1441,7 +1513,8 @@ static int virNetClientIO(virNetClientPtr client,
     virResetLastError();
     rv = virNetClientIOEventLoop(client, thiscall);
 
-    virNetClientIOUpdateCallback(client, true);
+    if (client->sock)
+        virNetClientIOUpdateCallback(client, true);
 
     if (rv == 0 &&
         virGetLastError())
@@ -1467,7 +1540,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
         goto done;
 
     /* This should be impossible, but it doesn't hurt to check */
-    if (client->haveTheBuck)
+    if (client->haveTheBuck || client->wantClose)
         goto done;
 
     VIR_DEBUG("Event fired %p %d", sock, events);
@@ -1528,6 +1601,12 @@ static int virNetClientSendInternal(virNetClientPtr client,
 
     virNetClientLock(client);
 
+    if (!client->sock || client->wantClose) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("client socket is closed"));
+        goto unlock;
+    }
+
     if (virCondInit(&call->cond) < 0) {
         virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
                     _("cannot initialize condition variable"));
@@ -1554,6 +1633,8 @@ cleanup:
         ignore_value(virCondDestroy(&call->cond));
         VIR_FREE(call);
     }
+
+unlock:
     virNetClientUnlock(client);
     return ret;
 }
-- 
1.7.8.rc3


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]