[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH v4 RESEND 4/4] Using threadpool API to manage qemud worker



---
 daemon/libvirtd.c |  172 +++++++++--------------------------------------------
 daemon/libvirtd.h |    4 +
 2 files changed, 33 insertions(+), 143 deletions(-)

diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index 791b3dc..dbd050a 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -67,6 +67,7 @@
 #include "stream.h"
 #include "hooks.h"
 #include "virtaudit.h"
+#include "threadpool.h"
 #ifdef HAVE_AVAHI
 # include "mdns.h"
 #endif
@@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
 
 void
 qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
     client->auth = sock->auth;
     client->addr = addr;
     client->addrstr = addrstr;
+    client->server = server;
     addrstr = NULL;
 
     for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) {
@@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
 
     server->clients[server->nclients++] = client;
 
-    if (server->nclients > server->nactiveworkers &&
-        server->nactiveworkers < server->nworkers) {
-        for (i = 0 ; i < server->nworkers ; i++) {
-            if (!server->workers[i].hasThread) {
-                if (qemudStartWorker(server, &server->workers[i]) < 0)
-                    return -1;
-                server->nactiveworkers++;
-                break;
-            }
-        }
-    }
-
-
     return 0;
 
 error:
@@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
     VIR_FREE(client->addrstr);
 }
 
-
-/* Caller must hold server lock */
-static struct qemud_client *qemudPendingJob(struct qemud_server *server)
+static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED)
 {
-    int i;
-    for (i = 0 ; i < server->nclients ; i++) {
-        virMutexLock(&server->clients[i]->lock);
-        if (server->clients[i]->dx) {
-            /* Delibrately don't unlock client - caller wants the lock */
-            return server->clients[i];
-        }
-        virMutexUnlock(&server->clients[i]->lock);
-    }
-    return NULL;
-}
+    struct qemud_client *client = data;
+    struct qemud_client_message *msg;
 
-static void *qemudWorker(void *data)
-{
-    struct qemud_worker *worker = data;
-    struct qemud_server *server = worker->server;
+    virMutexLock(&client->lock);
 
-    while (1) {
-        struct qemud_client *client = NULL;
-        struct qemud_client_message *msg;
+    /* Remove our message from dispatch queue while we use it */
+    msg = qemudClientMessageQueueServe(&client->dx);
 
-        virMutexLock(&server->lock);
-        while ((client = qemudPendingJob(server)) == NULL) {
-            if (worker->quitRequest ||
-                virCondWait(&server->job, &server->lock) < 0) {
-                virMutexUnlock(&server->lock);
-                return NULL;
-            }
-        }
-        if (worker->quitRequest) {
-            virMutexUnlock(&client->lock);
-            virMutexUnlock(&server->lock);
-            return NULL;
-        }
-        worker->processingCall = 1;
-        virMutexUnlock(&server->lock);
-
-        /* We own a locked client now... */
-        client->refs++;
-
-        /* Remove our message from dispatch queue while we use it */
-        msg = qemudClientMessageQueueServe(&client->dx);
-
-        /* This function drops the lock during dispatch,
-         * and re-acquires it before returning */
-        if (remoteDispatchClientRequest (server, client, msg) < 0) {
-            VIR_FREE(msg);
-            qemudDispatchClientFailure(client);
-            client->refs--;
-            virMutexUnlock(&client->lock);
-            continue;
-        }
-
-        client->refs--;
-        virMutexUnlock(&client->lock);
-
-        virMutexLock(&server->lock);
-        worker->processingCall = 0;
-        virMutexUnlock(&server->lock);
-    }
-}
-
-static int qemudStartWorker(struct qemud_server *server,
-                            struct qemud_worker *worker) {
-    pthread_attr_t attr;
-    pthread_attr_init(&attr);
-    /* We want to join workers, so don't detach them */
-    /*pthread_attr_setdetachstate(&attr, 1);*/
-
-    if (worker->hasThread)
-        return -1;
-
-    worker->server = server;
-    worker->hasThread = 1;
-    worker->quitRequest = 0;
-    worker->processingCall = 0;
-
-    if (pthread_create(&worker->thread,
-                       &attr,
-                       qemudWorker,
-                       worker) != 0) {
-        worker->hasThread = 0;
-        worker->server = NULL;
-        return -1;
+    /* This function drops the lock during dispatch,
+     * and re-acquires it before returning */
+    if (remoteDispatchClientRequest (client->server, client, msg) < 0) {
+        VIR_FREE(msg);
+        qemudDispatchClientFailure(client);
     }
 
-    return 0;
+    client->refs--;
+    virMutexUnlock(&client->lock);
 }
 
-
 /*
  * Read data into buffer using wire decoding (plain or TLS)
  *
@@ -1857,8 +1772,11 @@ readmore:
         }
 
         /* Move completed message to the end of the dispatch queue */
-        if (msg)
+        if (msg) {
+            client->refs++;
             qemudClientMessageQueuePush(&client->dx, msg);
+            ignore_value(virThreadPoolSendJob(server->workerPool, client));
+        }
         client->nrequests++;
 
         /* Possibly need to create another receive buffer */
@@ -1870,9 +1788,6 @@ readmore:
                 client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
 
             qemudUpdateClientEvent(client);
-
-            /* Tell one of the workers to get on with it... */
-            virCondSignal(&server->job);
         }
     }
 }
@@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) {
         return NULL;
     }
 
-    for (i = 0 ; i < min_workers ; i++) {
-        if (qemudStartWorker(server, &server->workers[i]) < 0)
-            goto cleanup;
-        server->nactiveworkers++;
+    server->workerPool = virThreadPoolNew(min_workers,
+                                          max_workers,
+                                          qemudWorker,
+                                          NULL);
+    if (!server->workerPool) {
+        VIR_ERROR0(_("Failed to create thread pool"));
+        virMutexUnlock(&server->lock);
+        return NULL;
     }
 
     for (;!server->quitEventThread;) {
@@ -2367,47 +2286,14 @@ static void *qemudRunLoop(void *opaque) {
                 goto reprocess;
             }
         }
-
-        /* If number of active workers exceeds both the min_workers
-         * threshold and the number of clients, then kill some
-         * off */
-        for (i = 0 ; (i < server->nworkers &&
-                      server->nactiveworkers > server->nclients &&
-                      server->nactiveworkers > min_workers) ; i++) {
-
-            if (server->workers[i].hasThread &&
-                !server->workers[i].processingCall) {
-                server->workers[i].quitRequest = 1;
-
-                virCondBroadcast(&server->job);
-                virMutexUnlock(&server->lock);
-                pthread_join(server->workers[i].thread, NULL);
-                virMutexLock(&server->lock);
-                server->workers[i].hasThread = 0;
-                server->nactiveworkers--;
-            }
-        }
-    }
-
-cleanup:
-    for (i = 0 ; i < server->nworkers ; i++) {
-        if (!server->workers[i].hasThread)
-            continue;
-
-        server->workers[i].quitRequest = 1;
-        virCondBroadcast(&server->job);
-
-        virMutexUnlock(&server->lock);
-        pthread_join(server->workers[i].thread, NULL);
-        virMutexLock(&server->lock);
-        server->workers[i].hasThread = 0;
     }
-    VIR_FREE(server->workers);
     for (i = 0; i < server->nclients; i++)
         qemudFreeClient(server->clients[i]);
     server->nclients = 0;
     VIR_SHRINK_N(server->clients, server->nclients_max, server->nclients_max);
 
+    virThreadPoolFree(server->workerPool);
+    server->workerPool = NULL;
     virMutexUnlock(&server->lock);
     return NULL;
 }
diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h
index af20e56..d7e10dc 100644
--- a/daemon/libvirtd.h
+++ b/daemon/libvirtd.h
@@ -49,6 +49,7 @@
 # include "logging.h"
 # include "threads.h"
 # include "network.h"
+# include "threadpool.h"
 
 # if WITH_DTRACE
 #  ifndef LIBVIRTD_PROBES_H
@@ -192,6 +193,8 @@ struct qemud_client {
 
     int magic;
 
+    struct qemud_server *server;
+
     int fd;
     int watch;
     unsigned int readonly :1;
@@ -283,6 +286,7 @@ struct qemud_server {
 
     int privileged;
 
+    virThreadPoolPtr workerPool;
     size_t nworkers;
     size_t nactiveworkers;
     struct qemud_worker *workers;
-- 
1.7.3


-- 
Thanks,
Hu Tao


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]