[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [libvirt] PATCH: 18/25: Dynamic thread workers pool



The libvirtd.conf file has three parameters

  max_clients
  min_workers
  max_workers

When the daemon starts up it spawns min_workers threads. It
accepts connections from upto max_clients. I never implemented
the logic to auto-spawn more threads upto max_workers though.

This patch addresses that. Upon accept()ing a client connection
if the number of clients is greater than the number of active
worker threads, we spawn another worker, unless we've hit the
max workers limit. If the number of clients is greater than
the max_workers, this means some clients may have to wait for
other clients requests to finish before eing processed. No
great problem

This also fixes a shutdown problem. We were marking the threads
as detached, but still calling pthread_join() on them. This gives
an error on Linux, but just hangs on Solaris while it tries to
join a thread that has no intention of exiting.

So during shutdown we set a 'quit' flag on the worker, and then
broadcast a signal to wake it up from its condition variable
sleep. Upon wakup it notices the quit flag and exits, allowing
us to join & cleanup.

 qemud.c |  109 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
 qemud.h |   13 +++++++
 2 files changed, 106 insertions(+), 16 deletions(-)

Daniel

diff --git a/qemud/qemud.c b/qemud/qemud.c
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -167,7 +167,7 @@ static void sig_handler(int sig, siginfo
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-
+static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
 
 void
 qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1248,6 +1248,20 @@ static int qemudDispatchServer(struct qe
 
     server->clients[server->nclients++] = client;
 
+    if (server->nclients > server->nactiveworkers &&
+        server->nactiveworkers < server->nworkers) {
+        int i;
+        for (i = 0 ; i < server->nworkers ; i++) {
+            if (!server->workers[i].active) {
+                if (qemudStartWorker(server, &server->workers[i]) < 0)
+                    return -1;
+                server->nactiveworkers++;
+                break;
+            }
+        }
+    }
+
+
     return 0;
 
  cleanup:
@@ -1303,19 +1317,28 @@ static struct qemud_client *qemudPending
 
 static void *qemudWorker(void *data)
 {
-    struct qemud_server *server = data;
+    struct qemud_worker *worker = data;
+    struct qemud_server *server = worker->server;
 
     while (1) {
         struct qemud_client *client = NULL;
         struct qemud_client_message *reply;
 
         virMutexLock(&server->lock);
-        while ((client = qemudPendingJob(server)) == NULL) {
+        while (((client = qemudPendingJob(server)) == NULL) &&
+               !worker->quit) {
             if (virCondWait(&server->job, &server->lock) < 0) {
                 virMutexUnlock(&server->lock);
                 return NULL;
             }
         }
+        if (worker->quit) {
+            if (client)
+                virMutexUnlock(&client->lock);
+            virMutexUnlock(&server->lock);
+            return NULL;
+        }
+        worker->processing = 1;
         virMutexUnlock(&server->lock);
 
         /* We own a locked client now... */
@@ -1342,9 +1365,40 @@ static void *qemudWorker(void *data)
 
         client->refs--;
         virMutexUnlock(&client->lock);
+
+        virMutexLock(&server->lock);
+        worker->processing = 0;
+        virMutexUnlock(&server->lock);
     }
 }
 
+static int qemudStartWorker(struct qemud_server *server,
+                            struct qemud_worker *worker) {
+    pthread_attr_t attr;
+    pthread_attr_init(&attr);
+    /* We want to join workers, so don't detach them */
+    /*pthread_attr_setdetachstate(&attr, 1);*/
+
+    if (worker->active)
+        return -1;
+
+    worker->server = server;
+    worker->active = 1;
+    worker->quit = 0;
+    worker->processing = 0;
+
+    if (pthread_create(&worker->thread,
+                       &attr,
+                       qemudWorker,
+                       worker) != 0) {
+        worker->active = 0;
+        worker->server = NULL;
+        return -1;
+    }
+
+    return 0;
+}
+
 
 /*
  * Read data into buffer using wire decoding (plain or TLS)
@@ -1888,21 +1942,19 @@ static int qemudRunLoop(struct qemud_ser
 
     virMutexLock(&server->lock);
 
-    server->nworkers = min_workers;
+    if (min_workers > max_workers)
+        max_workers = min_workers;
+
+    server->nworkers = max_workers;
     if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
         VIR_ERROR0(_("Failed to allocate workers"));
         return -1;
     }
 
-    for (i = 0 ; i < server->nworkers ; i++) {
-        pthread_attr_t attr;
-        pthread_attr_init(&attr);
-        pthread_attr_setdetachstate(&attr, 1);
-
-        pthread_create(&server->workers[i],
-                       &attr,
-                       qemudWorker,
-                       server);
+    for (i = 0 ; i < min_workers ; i++) {
+        if (qemudStartWorker(server, &server->workers[i]) < 0)
+            goto cleanup;
+        server->nactiveworkers++;
     }
 
     for (;;) {
@@ -1948,6 +2000,26 @@ static int qemudRunLoop(struct qemud_ser
             }
         }
 
+        /* If number of active workers exceeds both the min_workers
+         * threshold and the number of clients, then kill some
+         * off */
+        for (i = 0 ; (i < server->nworkers &&
+                      server->nactiveworkers > server->nclients &&
+                      server->nactiveworkers > min_workers) ; i++) {
+
+            if (server->workers[i].active &&
+                !server->workers[i].processing) {
+                server->workers[i].quit = 1;
+
+                virCondBroadcast(&server->job);
+                virMutexUnlock(&server->lock);
+                pthread_join(server->workers[i].thread, NULL);
+                virMutexLock(&server->lock);
+                server->workers[i].active = 0;
+                server->nactiveworkers--;
+            }
+        }
+
         /* Unregister any timeout that's active, since we
          * just had an event processed
          */
@@ -1963,11 +2035,18 @@ static int qemudRunLoop(struct qemud_ser
         }
     }
 
+cleanup:
     for (i = 0 ; i < server->nworkers ; i++) {
-        pthread_t thread = server->workers[i];
+        if (!server->workers[i].active)
+            continue;
+
+        server->workers[i].quit = 1;
+        virCondBroadcast(&server->job);
+
         virMutexUnlock(&server->lock);
-        pthread_join(thread, NULL);
+        pthread_join(server->workers[i].thread, NULL);
         virMutexLock(&server->lock);
+        server->workers[i].active = 0;
     }
     VIR_FREE(server->workers);
 
diff --git a/qemud/qemud.h b/qemud/qemud.h
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -159,13 +159,24 @@ struct qemud_socket {
     struct qemud_socket *next;
 };
 
+struct qemud_worker {
+    pthread_t thread;
+    int active :1;
+    int processing :1;
+    int quit : 1;
+
+    /* back-pointer to our server */
+    struct qemud_server *server;
+};
+
 /* Main server state */
 struct qemud_server {
     virMutex lock;
     virCond job;
 
     int nworkers;
-    pthread_t *workers;
+    int nactiveworkers;
+    struct qemud_worker *workers;
     int nsockets;
     struct qemud_socket *sockets;
     int nclients;

-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]