[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH] daemon: Dynamically create worker threads when some get stuck



Up to now, we've created new worker threads only during new connection.
This patch monitors worker threads for liveness and dynamically create
new one if all are stuck, waiting for hypervisor to reply. This
situation can happen. All one need to do is send STOP signal to qemu.
The amount of time when we evaluate thread as stuck is defined in
WORKER_TIMEOUT macro.

With this approach we don't need to create new worker thread on incoming
connection. However, as number of active worker threads grows, it might
happen we need to size up the pool of worker threads and hence exceed
the max_worker configuration value.
---
 daemon/libvirtd.c |   77 +++++++++++++++++++++++++++++++++++++++++++---------
 daemon/libvirtd.h |    4 +++
 2 files changed, 67 insertions(+), 14 deletions(-)

diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index bcaa37b..df019b3 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -1491,19 +1491,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
 
     server->clients[server->nclients++] = client;
 
-    if (server->nclients > server->nactiveworkers &&
-        server->nactiveworkers < server->nworkers) {
-        for (i = 0 ; i < server->nworkers ; i++) {
-            if (!server->workers[i].hasThread) {
-                if (qemudStartWorker(server, &server->workers[i]) < 0)
-                    return -1;
-                server->nactiveworkers++;
-                break;
-            }
-        }
-    }
-
-
     return 0;
 
 error:
@@ -1629,6 +1616,9 @@ static void *qemudWorker(void *data)
 
         virMutexLock(&server->lock);
         worker->processingCall = 0;
+        /* The fact we're here means, this worker is not stuck
+         * and can serve clients. Thus suspend timeout */
+        virEventUpdateTimeout(server->workerTimeout, -1);
         virMutexUnlock(&server->lock);
     }
 }
@@ -1900,8 +1890,12 @@ readmore:
         }
 
         /* Move completed message to the end of the dispatch queue */
-        if (msg)
+        if (msg) {
             qemudClientMessageQueuePush(&client->dx, msg);
+            /* For queued message set timeout. If a worker thread
+             * will serve this, it will also remove this timeout. */
+            virEventUpdateTimeout(server->workerTimeout, WORKER_TIMEOUT);
+        }
         client->nrequests++;
 
         /* Possibly need to create another receive buffer */
@@ -2303,6 +2297,53 @@ static void qemudInactiveTimer(int timerid, void *data) {
     }
 }
 
+static void qemudWorkerTimeout(int timerid, void *data) {
+    struct qemud_server *server = (struct qemud_server *)data;
+    int i;
+
+    virMutexLock(&server->lock);
+    VIR_DEBUG("Some threads are not responding");
+    for (i = 0; i < server->nclients; i++) {
+        if (server->clients[i]->dx)
+            break;
+    }
+
+    if (i == server->nclients) {
+        /* All clients have been/are being served,
+         * there is no call waiting in the queue */
+        virEventUpdateTimeout(timerid, -1);
+    } else {
+        VIR_DEBUG("Got a queued call. Need to create worker");
+        if (server->nactiveworkers == server->nworkers) {
+            /* We need to size up worker threads pool */
+            if (VIR_REALLOC_N(server->workers, server->nworkers + 1) < 0) {
+                virReportOOMError();
+                goto cleanup;
+            }
+            /* Init new worker structure */
+            memset(&server->workers[server->nworkers], 0,
+                   sizeof(struct qemud_worker));
+            server->nworkers++;
+        }
+
+        /* Find free worker and start it */
+        for (i = 0; i < server->nworkers; i++) {
+            if (!server->workers[i].hasThread) {
+                if (qemudStartWorker(server, &server->workers[i]) < 0) {
+                    VIR_DEBUG("Could not start new worker");
+                    goto cleanup;
+                }
+                server->nactiveworkers++;
+                VIR_DEBUG("created worker %d", i);
+                break;
+            }
+        }
+    }
+
+cleanup:
+    virMutexUnlock(&server->lock);
+}
+
 static void qemudFreeClient(struct qemud_client *client) {
     while (client->rx) {
         struct qemud_client_message *msg
@@ -2346,6 +2387,14 @@ static void *qemudRunLoop(void *opaque) {
         return NULL;
     }
 
+    server->workerTimeout = virEventAddTimeout(-1,
+                                               qemudWorkerTimeout,
+                                               server, NULL);
+    if (server->workerTimeout < 0) {
+        VIR_ERROR(_("Failed to register worker threads timeout"));
+        return NULL;
+    }
+
     if (min_workers > max_workers)
         max_workers = min_workers;
 
diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h
index ea00d5c..1d828be 100644
--- a/daemon/libvirtd.h
+++ b/daemon/libvirtd.h
@@ -271,6 +271,8 @@ struct qemud_worker {
     struct qemud_server *server;
 };
 
+#define WORKER_TIMEOUT 3000
+
 /* Main server state */
 struct qemud_server {
     virMutex lock;
@@ -278,6 +280,8 @@ struct qemud_server {
 
     int privileged;
 
+    int workerTimeout;
+
     size_t nworkers;
     size_t nactiveworkers;
     struct qemud_worker *workers;
-- 
1.7.5.rc3


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]