[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH 22/28] Rename threadpool.{c, h} to virthreadpool.{c, h}



From: "Daniel P. Berrange" <berrange redhat com>

---
 src/Makefile.am                   |   2 +-
 src/nwfilter/nwfilter_dhcpsnoop.c |   2 +-
 src/qemu/qemu_conf.h              |   2 +-
 src/qemu/qemu_driver.c            |   2 +-
 src/rpc/virnetserver.c            |   2 +-
 src/util/threadpool.c             | 371 --------------------------------------
 src/util/threadpool.h             |  53 ------
 src/util/virthreadpool.c          | 371 ++++++++++++++++++++++++++++++++++++++
 src/util/virthreadpool.h          |  53 ++++++
 9 files changed, 429 insertions(+), 429 deletions(-)
 delete mode 100644 src/util/threadpool.c
 delete mode 100644 src/util/threadpool.h
 create mode 100644 src/util/virthreadpool.c
 create mode 100644 src/util/virthreadpool.h

diff --git a/src/Makefile.am b/src/Makefile.am
index 376c543..e74a3a3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -56,7 +56,6 @@ UTIL_SOURCES =							\
 		util/threads.c util/threads.h			\
 		util/threads-pthread.h				\
 		util/threads-win32.h				\
-		util/threadpool.c util/threadpool.h		\
 		util/uuid.c util/uuid.h				\
 		util/util.c util/util.h				\
 		util/viralloc.c util/viralloc.h			\
@@ -86,6 +85,7 @@ UTIL_SOURCES =							\
 		util/virstatslinux.c util/virstatslinux.h	\
 		util/virstoragefile.c util/virstoragefile.h	\
 		util/virsysinfo.c util/virsysinfo.h		\
+		util/virthreadpool.c util/virthreadpool.h	\
 		util/virtypedparam.c util/virtypedparam.h	\
 		util/xml.c util/xml.h				\
 		util/virterror.c util/virterror_internal.h	\
diff --git a/src/nwfilter/nwfilter_dhcpsnoop.c b/src/nwfilter/nwfilter_dhcpsnoop.c
index a798e95..c1ab622 100644
--- a/src/nwfilter/nwfilter_dhcpsnoop.c
+++ b/src/nwfilter/nwfilter_dhcpsnoop.c
@@ -65,7 +65,7 @@
 #include "virnetdev.h"
 #include "virfile.h"
 #include "viratomic.h"
-#include "threadpool.h"
+#include "virthreadpool.h"
 #include "configmake.h"
 #include "virtime.h"
 
diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h
index f928c29..0d4816e 100644
--- a/src/qemu/qemu_conf.h
+++ b/src/qemu/qemu_conf.h
@@ -41,7 +41,7 @@
 # include "driver.h"
 # include "virbitmap.h"
 # include "vircommand.h"
-# include "threadpool.h"
+# include "virthreadpool.h"
 # include "locking/lock_manager.h"
 # include "qemu_capabilities.h"
 
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index 0038d95..170f15d 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -85,7 +85,7 @@
 #include "virfile.h"
 #include "fdstream.h"
 #include "configmake.h"
-#include "threadpool.h"
+#include "virthreadpool.h"
 #include "locking/lock_manager.h"
 #include "locking/domain_lock.h"
 #include "virkeycode.h"
diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
index 67cd4b5..26ceb0c 100644
--- a/src/rpc/virnetserver.c
+++ b/src/rpc/virnetserver.c
@@ -32,7 +32,7 @@
 #include "viralloc.h"
 #include "virterror_internal.h"
 #include "threads.h"
-#include "threadpool.h"
+#include "virthreadpool.h"
 #include "util.h"
 #include "virfile.h"
 #include "virnetservermdns.h"
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
deleted file mode 100644
index 9d3d5d2..0000000
--- a/src/util/threadpool.c
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * threadpool.c: a generic thread pool implementation
- *
- * Copyright (C) 2010 Hu Tao
- * Copyright (C) 2010 Daniel P. Berrange
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library.  If not, see
- * <http://www.gnu.org/licenses/>.
- *
- * Authors:
- *     Hu Tao <hutao cn fujitsu com>
- *     Daniel P. Berrange <berrange redhat com>
- */
-
-#include <config.h>
-
-#include "threadpool.h"
-#include "viralloc.h"
-#include "threads.h"
-#include "virterror_internal.h"
-
-#define VIR_FROM_THIS VIR_FROM_NONE
-
-typedef struct _virThreadPoolJob virThreadPoolJob;
-typedef virThreadPoolJob *virThreadPoolJobPtr;
-
-struct _virThreadPoolJob {
-    virThreadPoolJobPtr prev;
-    virThreadPoolJobPtr next;
-    unsigned int priority;
-
-    void *data;
-};
-
-typedef struct _virThreadPoolJobList virThreadPoolJobList;
-typedef virThreadPoolJobList *virThreadPoolJobListPtr;
-
-struct _virThreadPoolJobList {
-    virThreadPoolJobPtr head;
-    virThreadPoolJobPtr tail;
-    virThreadPoolJobPtr firstPrio;
-};
-
-
-struct _virThreadPool {
-    bool quit;
-
-    virThreadPoolJobFunc jobFunc;
-    void *jobOpaque;
-    virThreadPoolJobList jobList;
-    size_t jobQueueDepth;
-
-    virMutex mutex;
-    virCond cond;
-    virCond quit_cond;
-
-    size_t maxWorkers;
-    size_t minWorkers;
-    size_t freeWorkers;
-    size_t nWorkers;
-    virThreadPtr workers;
-
-    size_t nPrioWorkers;
-    virThreadPtr prioWorkers;
-    virCond prioCond;
-};
-
-struct virThreadPoolWorkerData {
-    virThreadPoolPtr pool;
-    virCondPtr cond;
-    bool priority;
-};
-
-static void virThreadPoolWorker(void *opaque)
-{
-    struct virThreadPoolWorkerData *data = opaque;
-    virThreadPoolPtr pool = data->pool;
-    virCondPtr cond = data->cond;
-    bool priority = data->priority;
-    virThreadPoolJobPtr job = NULL;
-
-    VIR_FREE(data);
-
-    virMutexLock(&pool->mutex);
-
-    while (1) {
-        while (!pool->quit &&
-               ((!priority && !pool->jobList.head) ||
-                (priority && !pool->jobList.firstPrio))) {
-            if (!priority)
-                pool->freeWorkers++;
-            if (virCondWait(cond, &pool->mutex) < 0) {
-                if (!priority)
-                    pool->freeWorkers--;
-                goto out;
-            }
-            if (!priority)
-                pool->freeWorkers--;
-        }
-
-        if (pool->quit)
-            break;
-
-        if (priority) {
-            job = pool->jobList.firstPrio;
-        } else {
-            job = pool->jobList.head;
-        }
-
-        if (job == pool->jobList.firstPrio) {
-            virThreadPoolJobPtr tmp = job->next;
-            while (tmp) {
-                if (tmp->priority) {
-                    break;
-                }
-                tmp = tmp->next;
-            }
-            pool->jobList.firstPrio = tmp;
-        }
-
-        if (job->prev)
-            job->prev->next = job->next;
-        else
-            pool->jobList.head = job->next;
-        if (job->next)
-            job->next->prev = job->prev;
-        else
-            pool->jobList.tail = job->prev;
-
-        pool->jobQueueDepth--;
-
-        virMutexUnlock(&pool->mutex);
-        (pool->jobFunc)(job->data, pool->jobOpaque);
-        VIR_FREE(job);
-        virMutexLock(&pool->mutex);
-    }
-
-out:
-    if (priority)
-        pool->nPrioWorkers--;
-    else
-        pool->nWorkers--;
-    if (pool->nWorkers == 0 && pool->nPrioWorkers==0)
-        virCondSignal(&pool->quit_cond);
-    virMutexUnlock(&pool->mutex);
-}
-
-virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
-                                  size_t maxWorkers,
-                                  size_t prioWorkers,
-                                  virThreadPoolJobFunc func,
-                                  void *opaque)
-{
-    virThreadPoolPtr pool;
-    size_t i;
-    struct virThreadPoolWorkerData *data = NULL;
-
-    if (minWorkers > maxWorkers)
-        minWorkers = maxWorkers;
-
-    if (VIR_ALLOC(pool) < 0) {
-        virReportOOMError();
-        return NULL;
-    }
-
-    pool->jobList.tail = pool->jobList.head = NULL;
-
-    pool->jobFunc = func;
-    pool->jobOpaque = opaque;
-
-    if (virMutexInit(&pool->mutex) < 0)
-        goto error;
-    if (virCondInit(&pool->cond) < 0)
-        goto error;
-    if (virCondInit(&pool->quit_cond) < 0)
-        goto error;
-
-    if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
-        goto error;
-
-    pool->minWorkers = minWorkers;
-    pool->maxWorkers = maxWorkers;
-
-    for (i = 0; i < minWorkers; i++) {
-        if (VIR_ALLOC(data) < 0) {
-            virReportOOMError();
-            goto error;
-        }
-        data->pool = pool;
-        data->cond = &pool->cond;
-
-        if (virThreadCreate(&pool->workers[i],
-                            true,
-                            virThreadPoolWorker,
-                            data) < 0) {
-            goto error;
-        }
-        pool->nWorkers++;
-    }
-
-    if (prioWorkers) {
-        if (virCondInit(&pool->prioCond) < 0)
-            goto error;
-        if (VIR_ALLOC_N(pool->prioWorkers, prioWorkers) < 0)
-            goto error;
-
-        for (i = 0; i < prioWorkers; i++) {
-            if (VIR_ALLOC(data) < 0) {
-                virReportOOMError();
-                goto error;
-            }
-            data->pool = pool;
-            data->cond = &pool->prioCond;
-            data->priority = true;
-
-            if (virThreadCreate(&pool->prioWorkers[i],
-                                true,
-                                virThreadPoolWorker,
-                                data) < 0) {
-                goto error;
-            }
-            pool->nPrioWorkers++;
-        }
-    }
-
-    return pool;
-
-error:
-    VIR_FREE(data);
-    virThreadPoolFree(pool);
-    return NULL;
-
-}
-
-void virThreadPoolFree(virThreadPoolPtr pool)
-{
-    virThreadPoolJobPtr job;
-    bool priority = false;
-
-    if (!pool)
-        return;
-
-    virMutexLock(&pool->mutex);
-    pool->quit = true;
-    if (pool->nWorkers > 0)
-        virCondBroadcast(&pool->cond);
-    if (pool->nPrioWorkers > 0) {
-        priority = true;
-        virCondBroadcast(&pool->prioCond);
-    }
-
-    while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
-        ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
-
-    while ((job = pool->jobList.head)) {
-        pool->jobList.head = pool->jobList.head->next;
-        VIR_FREE(job);
-    }
-
-    VIR_FREE(pool->workers);
-    virMutexUnlock(&pool->mutex);
-    virMutexDestroy(&pool->mutex);
-    ignore_value(virCondDestroy(&pool->quit_cond));
-    ignore_value(virCondDestroy(&pool->cond));
-    if (priority) {
-        VIR_FREE(pool->prioWorkers);
-        ignore_value(virCondDestroy(&pool->prioCond));
-    }
-    VIR_FREE(pool);
-}
-
-
-size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool)
-{
-    return pool->minWorkers;
-}
-
-size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool)
-{
-    return pool->maxWorkers;
-}
-
-size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool)
-{
-    return pool->nPrioWorkers;
-}
-
-/*
- * @priority - job priority
- * Return: 0 on success, -1 otherwise
- */
-int virThreadPoolSendJob(virThreadPoolPtr pool,
-                         unsigned int priority,
-                         void *jobData)
-{
-    virThreadPoolJobPtr job;
-    struct virThreadPoolWorkerData *data = NULL;
-
-    virMutexLock(&pool->mutex);
-    if (pool->quit)
-        goto error;
-
-    if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
-        pool->nWorkers < pool->maxWorkers) {
-        if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
-            virReportOOMError();
-            goto error;
-        }
-
-        if (VIR_ALLOC(data) < 0) {
-            pool->nWorkers--;
-            virReportOOMError();
-            goto error;
-        }
-
-        data->pool = pool;
-        data->cond = &pool->cond;
-
-        if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
-                            true,
-                            virThreadPoolWorker,
-                            data) < 0) {
-            VIR_FREE(data);
-            pool->nWorkers--;
-            goto error;
-        }
-    }
-
-    if (VIR_ALLOC(job) < 0) {
-        virReportOOMError();
-        goto error;
-    }
-
-    job->data = jobData;
-    job->priority = priority;
-
-    job->prev = pool->jobList.tail;
-    if (pool->jobList.tail)
-        pool->jobList.tail->next = job;
-    pool->jobList.tail = job;
-
-    if (!pool->jobList.head)
-        pool->jobList.head = job;
-
-    if (priority && !pool->jobList.firstPrio)
-        pool->jobList.firstPrio = job;
-
-    pool->jobQueueDepth++;
-
-    virCondSignal(&pool->cond);
-    if (priority)
-        virCondSignal(&pool->prioCond);
-
-    virMutexUnlock(&pool->mutex);
-    return 0;
-
-error:
-    virMutexUnlock(&pool->mutex);
-    return -1;
-}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
deleted file mode 100644
index 4479647..0000000
--- a/src/util/threadpool.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * threadpool.h: a generic thread pool implementation
- *
- * Copyright (C) 2010 Hu Tao
- * Copyright (C) 2010 Daniel P. Berrange
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library.  If not, see
- * <http://www.gnu.org/licenses/>.
- *
- * Author:
- *     Hu Tao <hutao cn fujitsu com>
- *     Daniel P. Berrange <berrange redhat com>
- */
-
-#ifndef __VIR_THREADPOOL_H__
-# define __VIR_THREADPOOL_H__
-
-# include "internal.h"
-
-typedef struct _virThreadPool virThreadPool;
-typedef virThreadPool *virThreadPoolPtr;
-
-typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
-
-virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
-                                  size_t maxWorkers,
-                                  size_t prioWorkers,
-                                  virThreadPoolJobFunc func,
-                                  void *opaque) ATTRIBUTE_NONNULL(4);
-
-size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool);
-size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool);
-size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool);
-
-void virThreadPoolFree(virThreadPoolPtr pool);
-
-int virThreadPoolSendJob(virThreadPoolPtr pool,
-                         unsigned int priority,
-                         void *jobdata) ATTRIBUTE_NONNULL(1)
-                                        ATTRIBUTE_RETURN_CHECK;
-
-#endif
diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c
new file mode 100644
index 0000000..c13b078
--- /dev/null
+++ b/src/util/virthreadpool.c
@@ -0,0 +1,371 @@
+/*
+ * threadpool.c: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Authors:
+ *     Hu Tao <hutao cn fujitsu com>
+ *     Daniel P. Berrange <berrange redhat com>
+ */
+
+#include <config.h>
+
+#include "virthreadpool.h"
+#include "viralloc.h"
+#include "threads.h"
+#include "virterror_internal.h"
+
+#define VIR_FROM_THIS VIR_FROM_NONE
+
+typedef struct _virThreadPoolJob virThreadPoolJob;
+typedef virThreadPoolJob *virThreadPoolJobPtr;
+
+struct _virThreadPoolJob {
+    virThreadPoolJobPtr prev;
+    virThreadPoolJobPtr next;
+    unsigned int priority;
+
+    void *data;
+};
+
+typedef struct _virThreadPoolJobList virThreadPoolJobList;
+typedef virThreadPoolJobList *virThreadPoolJobListPtr;
+
+struct _virThreadPoolJobList {
+    virThreadPoolJobPtr head;
+    virThreadPoolJobPtr tail;
+    virThreadPoolJobPtr firstPrio;
+};
+
+
+struct _virThreadPool {
+    bool quit;
+
+    virThreadPoolJobFunc jobFunc;
+    void *jobOpaque;
+    virThreadPoolJobList jobList;
+    size_t jobQueueDepth;
+
+    virMutex mutex;
+    virCond cond;
+    virCond quit_cond;
+
+    size_t maxWorkers;
+    size_t minWorkers;
+    size_t freeWorkers;
+    size_t nWorkers;
+    virThreadPtr workers;
+
+    size_t nPrioWorkers;
+    virThreadPtr prioWorkers;
+    virCond prioCond;
+};
+
+struct virThreadPoolWorkerData {
+    virThreadPoolPtr pool;
+    virCondPtr cond;
+    bool priority;
+};
+
+static void virThreadPoolWorker(void *opaque)
+{
+    struct virThreadPoolWorkerData *data = opaque;
+    virThreadPoolPtr pool = data->pool;
+    virCondPtr cond = data->cond;
+    bool priority = data->priority;
+    virThreadPoolJobPtr job = NULL;
+
+    VIR_FREE(data);
+
+    virMutexLock(&pool->mutex);
+
+    while (1) {
+        while (!pool->quit &&
+               ((!priority && !pool->jobList.head) ||
+                (priority && !pool->jobList.firstPrio))) {
+            if (!priority)
+                pool->freeWorkers++;
+            if (virCondWait(cond, &pool->mutex) < 0) {
+                if (!priority)
+                    pool->freeWorkers--;
+                goto out;
+            }
+            if (!priority)
+                pool->freeWorkers--;
+        }
+
+        if (pool->quit)
+            break;
+
+        if (priority) {
+            job = pool->jobList.firstPrio;
+        } else {
+            job = pool->jobList.head;
+        }
+
+        if (job == pool->jobList.firstPrio) {
+            virThreadPoolJobPtr tmp = job->next;
+            while (tmp) {
+                if (tmp->priority) {
+                    break;
+                }
+                tmp = tmp->next;
+            }
+            pool->jobList.firstPrio = tmp;
+        }
+
+        if (job->prev)
+            job->prev->next = job->next;
+        else
+            pool->jobList.head = job->next;
+        if (job->next)
+            job->next->prev = job->prev;
+        else
+            pool->jobList.tail = job->prev;
+
+        pool->jobQueueDepth--;
+
+        virMutexUnlock(&pool->mutex);
+        (pool->jobFunc)(job->data, pool->jobOpaque);
+        VIR_FREE(job);
+        virMutexLock(&pool->mutex);
+    }
+
+out:
+    if (priority)
+        pool->nPrioWorkers--;
+    else
+        pool->nWorkers--;
+    if (pool->nWorkers == 0 && pool->nPrioWorkers==0)
+        virCondSignal(&pool->quit_cond);
+    virMutexUnlock(&pool->mutex);
+}
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+                                  size_t maxWorkers,
+                                  size_t prioWorkers,
+                                  virThreadPoolJobFunc func,
+                                  void *opaque)
+{
+    virThreadPoolPtr pool;
+    size_t i;
+    struct virThreadPoolWorkerData *data = NULL;
+
+    if (minWorkers > maxWorkers)
+        minWorkers = maxWorkers;
+
+    if (VIR_ALLOC(pool) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    pool->jobList.tail = pool->jobList.head = NULL;
+
+    pool->jobFunc = func;
+    pool->jobOpaque = opaque;
+
+    if (virMutexInit(&pool->mutex) < 0)
+        goto error;
+    if (virCondInit(&pool->cond) < 0)
+        goto error;
+    if (virCondInit(&pool->quit_cond) < 0)
+        goto error;
+
+    if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
+        goto error;
+
+    pool->minWorkers = minWorkers;
+    pool->maxWorkers = maxWorkers;
+
+    for (i = 0; i < minWorkers; i++) {
+        if (VIR_ALLOC(data) < 0) {
+            virReportOOMError();
+            goto error;
+        }
+        data->pool = pool;
+        data->cond = &pool->cond;
+
+        if (virThreadCreate(&pool->workers[i],
+                            true,
+                            virThreadPoolWorker,
+                            data) < 0) {
+            goto error;
+        }
+        pool->nWorkers++;
+    }
+
+    if (prioWorkers) {
+        if (virCondInit(&pool->prioCond) < 0)
+            goto error;
+        if (VIR_ALLOC_N(pool->prioWorkers, prioWorkers) < 0)
+            goto error;
+
+        for (i = 0; i < prioWorkers; i++) {
+            if (VIR_ALLOC(data) < 0) {
+                virReportOOMError();
+                goto error;
+            }
+            data->pool = pool;
+            data->cond = &pool->prioCond;
+            data->priority = true;
+
+            if (virThreadCreate(&pool->prioWorkers[i],
+                                true,
+                                virThreadPoolWorker,
+                                data) < 0) {
+                goto error;
+            }
+            pool->nPrioWorkers++;
+        }
+    }
+
+    return pool;
+
+error:
+    VIR_FREE(data);
+    virThreadPoolFree(pool);
+    return NULL;
+
+}
+
+void virThreadPoolFree(virThreadPoolPtr pool)
+{
+    virThreadPoolJobPtr job;
+    bool priority = false;
+
+    if (!pool)
+        return;
+
+    virMutexLock(&pool->mutex);
+    pool->quit = true;
+    if (pool->nWorkers > 0)
+        virCondBroadcast(&pool->cond);
+    if (pool->nPrioWorkers > 0) {
+        priority = true;
+        virCondBroadcast(&pool->prioCond);
+    }
+
+    while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
+        ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
+
+    while ((job = pool->jobList.head)) {
+        pool->jobList.head = pool->jobList.head->next;
+        VIR_FREE(job);
+    }
+
+    VIR_FREE(pool->workers);
+    virMutexUnlock(&pool->mutex);
+    virMutexDestroy(&pool->mutex);
+    ignore_value(virCondDestroy(&pool->quit_cond));
+    ignore_value(virCondDestroy(&pool->cond));
+    if (priority) {
+        VIR_FREE(pool->prioWorkers);
+        ignore_value(virCondDestroy(&pool->prioCond));
+    }
+    VIR_FREE(pool);
+}
+
+
+size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool)
+{
+    return pool->minWorkers;
+}
+
+size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool)
+{
+    return pool->maxWorkers;
+}
+
+size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool)
+{
+    return pool->nPrioWorkers;
+}
+
+/*
+ * @priority - job priority
+ * Return: 0 on success, -1 otherwise
+ */
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+                         unsigned int priority,
+                         void *jobData)
+{
+    virThreadPoolJobPtr job;
+    struct virThreadPoolWorkerData *data = NULL;
+
+    virMutexLock(&pool->mutex);
+    if (pool->quit)
+        goto error;
+
+    if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
+        pool->nWorkers < pool->maxWorkers) {
+        if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
+            virReportOOMError();
+            goto error;
+        }
+
+        if (VIR_ALLOC(data) < 0) {
+            pool->nWorkers--;
+            virReportOOMError();
+            goto error;
+        }
+
+        data->pool = pool;
+        data->cond = &pool->cond;
+
+        if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
+                            true,
+                            virThreadPoolWorker,
+                            data) < 0) {
+            VIR_FREE(data);
+            pool->nWorkers--;
+            goto error;
+        }
+    }
+
+    if (VIR_ALLOC(job) < 0) {
+        virReportOOMError();
+        goto error;
+    }
+
+    job->data = jobData;
+    job->priority = priority;
+
+    job->prev = pool->jobList.tail;
+    if (pool->jobList.tail)
+        pool->jobList.tail->next = job;
+    pool->jobList.tail = job;
+
+    if (!pool->jobList.head)
+        pool->jobList.head = job;
+
+    if (priority && !pool->jobList.firstPrio)
+        pool->jobList.firstPrio = job;
+
+    pool->jobQueueDepth++;
+
+    virCondSignal(&pool->cond);
+    if (priority)
+        virCondSignal(&pool->prioCond);
+
+    virMutexUnlock(&pool->mutex);
+    return 0;
+
+error:
+    virMutexUnlock(&pool->mutex);
+    return -1;
+}
diff --git a/src/util/virthreadpool.h b/src/util/virthreadpool.h
new file mode 100644
index 0000000..4479647
--- /dev/null
+++ b/src/util/virthreadpool.h
@@ -0,0 +1,53 @@
+/*
+ * threadpool.h: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Author:
+ *     Hu Tao <hutao cn fujitsu com>
+ *     Daniel P. Berrange <berrange redhat com>
+ */
+
+#ifndef __VIR_THREADPOOL_H__
+# define __VIR_THREADPOOL_H__
+
+# include "internal.h"
+
+typedef struct _virThreadPool virThreadPool;
+typedef virThreadPool *virThreadPoolPtr;
+
+typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+                                  size_t maxWorkers,
+                                  size_t prioWorkers,
+                                  virThreadPoolJobFunc func,
+                                  void *opaque) ATTRIBUTE_NONNULL(4);
+
+size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool);
+size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool);
+size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool);
+
+void virThreadPoolFree(virThreadPoolPtr pool);
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+                         unsigned int priority,
+                         void *jobdata) ATTRIBUTE_NONNULL(1)
+                                        ATTRIBUTE_RETURN_CHECK;
+
+#endif
-- 
1.7.11.7


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]