[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [libvirt] PATCH: 18/25: Dynamic thread workers pool



On Tue, Jan 20, 2009 at 11:21:03AM +0100, Jim Meyering wrote:
> "Daniel P. Berrange" <berrange redhat com> wrote:
> > The libvirtd.conf file has three parameters
> >
> >   max_clients
> >   min_workers
> >   max_workers
> >
> > When the daemon starts up it spawns min_workers threads. It
> > accepts connections from upto max_clients. I never implemented
> > the logic to auto-spawn more threads upto max_workers though.
> ...
> 
> Other than the request to rename "active",
> and the signedness nits below, this looks fine.
> ACK
> 
> > diff --git a/qemud/qemud.h b/qemud/qemud.h
> ...
> >  struct qemud_server {
> >      virMutex lock;
> >      virCond job;
> >
> >      int nworkers;
> > -    pthread_t *workers;
> > +    int nactiveworkers;
> > +    struct qemud_worker *workers;
> 
> It'd be nice to make all of these "n*" members unsigned,
> assuming that they should never go negative.

I've not changed this to unsigned, because again we compare it
in various places to the signed in config setting, and I'd
prefer we address all this in one go
 
> >      int nsockets;
> >      struct qemud_socket *sockets;
> >      int nclients;

Here is an update with the 3 flags renamed to something more clear


Daniel

diff --git a/qemud/qemud.c b/qemud/qemud.c
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -167,7 +167,7 @@ static void sig_handler(int sig, siginfo
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-
+static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
 
 void
 qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1249,6 +1249,20 @@ static int qemudDispatchServer(struct qe
 
     server->clients[server->nclients++] = client;
 
+    if (server->nclients > server->nactiveworkers &&
+        server->nactiveworkers < server->nworkers) {
+        int i;
+        for (i = 0 ; i < server->nworkers ; i++) {
+            if (!server->workers[i].hasThread) {
+                if (qemudStartWorker(server, &server->workers[i]) < 0)
+                    return -1;
+                server->nactiveworkers++;
+                break;
+            }
+        }
+    }
+
+
     return 0;
 
  cleanup:
@@ -1304,19 +1318,28 @@ static struct qemud_client *qemudPending
 
 static void *qemudWorker(void *data)
 {
-    struct qemud_server *server = data;
+    struct qemud_worker *worker = data;
+    struct qemud_server *server = worker->server;
 
     while (1) {
         struct qemud_client *client = NULL;
         struct qemud_client_message *reply;
 
         virMutexLock(&server->lock);
-        while ((client = qemudPendingJob(server)) == NULL) {
+        while (((client = qemudPendingJob(server)) == NULL) &&
+               !worker->quitRequest) {
             if (virCondWait(&server->job, &server->lock) < 0) {
                 virMutexUnlock(&server->lock);
                 return NULL;
             }
         }
+        if (worker->quitRequest) {
+            if (client)
+                virMutexUnlock(&client->lock);
+            virMutexUnlock(&server->lock);
+            return NULL;
+        }
+        worker->processingCall = 1;
         virMutexUnlock(&server->lock);
 
         /* We own a locked client now... */
@@ -1343,9 +1366,40 @@ static void *qemudWorker(void *data)
 
         client->refs--;
         virMutexUnlock(&client->lock);
+
+        virMutexLock(&server->lock);
+        worker->processingCall = 0;
+        virMutexUnlock(&server->lock);
     }
 }
 
+static int qemudStartWorker(struct qemud_server *server,
+                            struct qemud_worker *worker) {
+    pthread_attr_t attr;
+    pthread_attr_init(&attr);
+    /* We want to join workers, so don't detach them */
+    /*pthread_attr_setdetachstate(&attr, 1);*/
+
+    if (worker->hasThread)
+        return -1;
+
+    worker->server = server;
+    worker->hasThread = 1;
+    worker->quitRequest = 0;
+    worker->processingCall = 0;
+
+    if (pthread_create(&worker->thread,
+                       &attr,
+                       qemudWorker,
+                       worker) != 0) {
+        worker->hasThread = 0;
+        worker->server = NULL;
+        return -1;
+    }
+
+    return 0;
+}
+
 
 /*
  * Read data into buffer using wire decoding (plain or TLS)
@@ -1942,21 +1996,19 @@ static int qemudRunLoop(struct qemud_ser
 
     virMutexLock(&server->lock);
 
-    server->nworkers = min_workers;
+    if (min_workers > max_workers)
+        max_workers = min_workers;
+
+    server->nworkers = max_workers;
     if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
         VIR_ERROR0(_("Failed to allocate workers"));
         return -1;
     }
 
-    for (i = 0 ; i < server->nworkers ; i++) {
-        pthread_attr_t attr;
-        pthread_attr_init(&attr);
-        pthread_attr_setdetachstate(&attr, 1);
-
-        pthread_create(&server->workers[i],
-                       &attr,
-                       qemudWorker,
-                       server);
+    for (i = 0 ; i < min_workers ; i++) {
+        if (qemudStartWorker(server, &server->workers[i]) < 0)
+            goto cleanup;
+        server->nactiveworkers++;
     }
 
     for (;;) {
@@ -2002,6 +2054,26 @@ static int qemudRunLoop(struct qemud_ser
             }
         }
 
+        /* If number of active workers exceeds both the min_workers
+         * threshold and the number of clients, then kill some
+         * off */
+        for (i = 0 ; (i < server->nworkers &&
+                      server->nactiveworkers > server->nclients &&
+                      server->nactiveworkers > min_workers) ; i++) {
+
+            if (server->workers[i].hasThread &&
+                !server->workers[i].processingCall) {
+                server->workers[i].quitRequest = 1;
+
+                virCondBroadcast(&server->job);
+                virMutexUnlock(&server->lock);
+                pthread_join(server->workers[i].thread, NULL);
+                virMutexLock(&server->lock);
+                server->workers[i].hasThread = 0;
+                server->nactiveworkers--;
+            }
+        }
+
         /* Unregister any timeout that's active, since we
          * just had an event processed
          */
@@ -2017,11 +2089,18 @@ static int qemudRunLoop(struct qemud_ser
         }
     }
 
+cleanup:
     for (i = 0 ; i < server->nworkers ; i++) {
-        pthread_t thread = server->workers[i];
+        if (!server->workers[i].hasThread)
+            continue;
+
+        server->workers[i].quitRequest = 1;
+        virCondBroadcast(&server->job);
+
         virMutexUnlock(&server->lock);
-        pthread_join(thread, NULL);
+        pthread_join(server->workers[i].thread, NULL);
         virMutexLock(&server->lock);
+        server->workers[i].hasThread = 0;
     }
     VIR_FREE(server->workers);
 
diff --git a/qemud/qemud.h b/qemud/qemud.h
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -157,13 +157,24 @@ struct qemud_socket {
     struct qemud_socket *next;
 };
 
+struct qemud_worker {
+    pthread_t thread;
+    int hasThread :1;
+    int processingCall :1;
+    int quitRequest : 1;
+
+    /* back-pointer to our server */
+    struct qemud_server *server;
+};
+
 /* Main server state */
 struct qemud_server {
     virMutex lock;
     virCond job;
 
     int nworkers;
-    pthread_t *workers;
+    int nactiveworkers;
+    struct qemud_worker *workers;
     int nsockets;
     struct qemud_socket *sockets;
     int nclients;


-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]