[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH 3/4] storage: Implement jobs for storage driver



Simply, when we are about to take an action which might take ages,
like allocating new volumes, wiping, etc. increment a counter of
jobs in pool object and unlock it. We don't want to hold the pool
locked during long term actions.
---
 src/conf/storage_conf.c      |   12 ++
 src/conf/storage_conf.h      |   22 +++-
 src/libvirt_private.syms     |    1 +
 src/storage/storage_driver.c |  284 +++++++++++++++++++++++++++++++-----------
 4 files changed, 246 insertions(+), 73 deletions(-)

diff --git a/src/conf/storage_conf.c b/src/conf/storage_conf.c
index bdf6218..e378ceb 100644
--- a/src/conf/storage_conf.c
+++ b/src/conf/storage_conf.c
@@ -251,6 +251,8 @@ virStorageVolDefFree(virStorageVolDefPtr def) {
     if (!def)
         return;
 
+    virMutexDestroy(&def->lock);
+    ignore_value(virCondDestroy(&def->job.cond));
     VIR_FREE(def->name);
     VIR_FREE(def->key);
 
@@ -978,6 +980,16 @@ virStorageVolDefParseXML(virStoragePoolDefPtr pool,
         return NULL;
     }
 
+    if (virMutexInit(&ret->lock) < 0) {
+        virReportSystemError(errno, "%s", _("Cannot init mutex"));
+        goto cleanup;
+    }
+
+    if (virCondInit(&ret->job.cond) < 0) {
+        virReportSystemError(errno, "%s", _("Cannot init cond"));
+        goto cleanup;
+    }
+
     ret->name = virXPathString("string(./name)", ctxt);
     if (ret->name == NULL) {
         virStorageReportError(VIR_ERR_XML_ERROR,
diff --git a/src/conf/storage_conf.h b/src/conf/storage_conf.h
index 1ef9295..481c806 100644
--- a/src/conf/storage_conf.h
+++ b/src/conf/storage_conf.h
@@ -83,6 +83,25 @@ struct _virStorageVolTarget {
 };
 
 
+enum virStorageVolJob {
+    VIR_STORAGE_VOL_JOB_NONE = 0, /* no job */
+    VIR_STORAGE_VOL_JOB_BUILD,
+    VIR_STORAGE_VOL_JOB_DELETE,
+    VIR_STORAGE_VOL_JOB_REFRESH,
+    VIR_STORAGE_VOL_JOB_WIPE,
+    VIR_STORAGE_VOL_JOB_RESIZE,
+    VIR_STORAGE_VOL_JOB_DOWNLOAD,
+    VIR_STORAGE_VOL_JOB_UPLOAD,
+
+    VIR_STORAGE_VOL_JOB_LAST
+};
+
+struct virStorageVolJobObj {
+    virCond cond;
+    enum virStorageVolJob active;
+    unsigned long long start;
+};
+
 typedef struct _virStorageVolDef virStorageVolDef;
 typedef virStorageVolDef *virStorageVolDefPtr;
 struct _virStorageVolDef {
@@ -90,7 +109,8 @@ struct _virStorageVolDef {
     char *key;
     int type; /* virStorageVolType enum */
 
-    unsigned int building;
+    virMutex lock;
+    struct virStorageVolJobObj job;
 
     unsigned long long allocation; /* bytes */
     unsigned long long capacity; /* bytes */
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index 1f55f5d..fef9d5a 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -565,6 +565,7 @@ virFDStreamOpen;
 virFDStreamConnectUNIX;
 virFDStreamOpenFile;
 virFDStreamCreateFile;
+virFDStreamSetInternalCloseCb;
 
 
 # hash.h
diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c
index 66811ce..7f3dfcd 100644
--- a/src/storage/storage_driver.c
+++ b/src/storage/storage_driver.c
@@ -48,6 +48,7 @@
 #include "virfile.h"
 #include "fdstream.h"
 #include "configmake.h"
+#include "virtime.h"
 
 #define VIR_FROM_THIS VIR_FROM_STORAGE
 
@@ -65,6 +66,111 @@ static void storageDriverUnlock(virStorageDriverStatePtr driver)
 }
 
 static void
+storageResetJob(virStorageVolDefPtr vol)
+{
+    struct virStorageVolJobObj *job = &vol->job;
+
+    job->active = VIR_STORAGE_VOL_JOB_NONE;
+    job->start = 0;
+}
+
+/* wait max. 30 seconds for job ackquire */
+#define STORAGE_JOB_WAIT_TIME (1000ull * 30)
+
+static int
+storageBeginJobInternal(virStorageDriverStatePtr driver ATTRIBUTE_UNUSED,
+                        virStoragePoolObjPtr pool,
+                        bool pool_locked,
+                        virStorageVolDefPtr vol,
+                        enum virStorageVolJob job)
+{
+    unsigned long long now;
+    unsigned long long then;
+
+    if (virTimeMillisNow(&now) < 0)
+        return -1;
+
+    then = now + STORAGE_JOB_WAIT_TIME;
+
+    while (vol->job.active != VIR_STORAGE_VOL_JOB_NONE) {
+        if (virCondWaitUntil(&vol->job.cond, &vol->lock, then) < 0)
+            goto error;
+    }
+
+    VIR_DEBUG("Starting job %d", job);
+    storageResetJob(vol);
+    vol->job.active = job;
+    vol->job.start = now;
+
+    if (pool_locked) {
+        pool->asyncjobs++;
+        virStoragePoolObjUnlock(pool);
+    }
+
+    return 0;
+
+error:
+    if (errno == ETIMEDOUT)
+        virStorageReportError(VIR_ERR_OPERATION_TIMEOUT, "%s",
+                              _("cannot acquire state change lock"));
+    else
+        virReportSystemError(errno, "%s",
+                             _("cannot acquire job mutex"));
+    if (pool_locked)
+        virStoragePoolObjLock(pool);
+    return -1;
+}
+
+static int
+storageBeginJob(virStorageDriverStatePtr driver,
+                virStorageVolDefPtr vol,
+                enum virStorageVolJob job)
+{
+    return storageBeginJobInternal(driver, NULL, false, vol, job);
+}
+
+static int
+storageBeginJobWithPool(virStorageDriverStatePtr driver,
+                        virStoragePoolObjPtr pool,
+                        virStorageVolDefPtr vol,
+                        enum virStorageVolJob job)
+{
+    return storageBeginJobInternal(driver, pool, true, vol, job);
+}
+
+static void
+storageEndJobInternal(virStorageDriverStatePtr driver,
+                      virStoragePoolObjPtr pool,
+                      bool pool_locked,
+                      virStorageVolDefPtr vol)
+{
+    VIR_DEBUG("Stopping job %d", vol->job.active);
+    storageResetJob(vol);
+    if (pool_locked) {
+        storageDriverLock(driver);
+        virStoragePoolObjLock(pool);
+        storageDriverUnlock(driver);
+        pool->asyncjobs--;
+    }
+    virCondBroadcast(&vol->job.cond);
+}
+
+static void
+storageEndJob(virStorageDriverStatePtr driver,
+              virStorageVolDefPtr vol)
+{
+    return storageEndJobInternal(driver, NULL, false, vol);
+}
+
+static void
+storageEndJobWithPool(virStorageDriverStatePtr driver,
+                      virStoragePoolObjPtr pool,
+                      virStorageVolDefPtr vol)
+{
+    return storageEndJobInternal(driver, pool, true, vol);
+}
+
+static void
 storageDriverAutostart(virStorageDriverStatePtr driver) {
     unsigned int i;
 
@@ -1365,18 +1471,16 @@ storageVolumeCreateXML(virStoragePoolPtr obj,
         memcpy(buildvoldef, voldef, sizeof(*voldef));
 
         /* Drop the pool lock during volume allocation */
-        pool->asyncjobs++;
-        voldef->building = 1;
-        virStoragePoolObjUnlock(pool);
+        if (storageBeginJobWithPool(driver, pool, voldef,
+                                    VIR_STORAGE_VOL_JOB_BUILD) < 0) {
+            VIR_FREE(buildvoldef);
+            goto cleanup;
+        }
 
         buildret = backend->buildVol(obj->conn, pool, buildvoldef);
 
-        storageDriverLock(driver);
-        virStoragePoolObjLock(pool);
-        storageDriverUnlock(driver);
 
-        voldef->building = 0;
-        pool->asyncjobs--;
+        storageEndJobWithPool(driver, pool, voldef);
 
         voldef = NULL;
         VIR_FREE(buildvoldef);
@@ -1416,7 +1520,7 @@ storageVolumeCreateXMLFrom(virStoragePoolPtr obj,
     virStorageBackendPtr backend;
     virStorageVolDefPtr origvol = NULL, newvol = NULL;
     virStorageVolPtr ret = NULL, volobj = NULL;
-    int buildret;
+    int buildret = -1;
 
     virCheckFlags(0, NULL);
 
@@ -1490,17 +1594,21 @@ storageVolumeCreateXMLFrom(virStoragePoolPtr obj,
         goto cleanup;
     }
 
-    if (origvol->building) {
-        virStorageReportError(VIR_ERR_OPERATION_INVALID,
-                              _("volume '%s' is still being allocated."),
-                              origvol->name);
-        goto cleanup;
+    if (backend->refreshVol) {
+        int refreshVolRet;
+
+        if (storageBeginJobWithPool(driver, pool, origvol,
+                                    VIR_STORAGE_VOL_JOB_REFRESH) < 0)
+            goto cleanup;
+
+        refreshVolRet =  backend->refreshVol(obj->conn, pool, origvol);
+
+        storageEndJobWithPool(driver, pool, origvol);
+
+        if (refreshVolRet < 0)
+            goto cleanup;
     }
 
-    if (backend->refreshVol &&
-        backend->refreshVol(obj->conn, pool, origvol) < 0)
-        goto cleanup;
-
     if (VIR_REALLOC_N(pool->volumes.objs,
                       pool->volumes.count+1) < 0) {
         virReportOOMError();
@@ -1517,34 +1625,23 @@ storageVolumeCreateXMLFrom(virStoragePoolPtr obj,
                               newvol->key);
 
     /* Drop the pool lock during volume allocation */
-    pool->asyncjobs++;
-    origvol->building = 1;
-    newvol->building = 1;
-    virStoragePoolObjUnlock(pool);
+    if (storageBeginJobWithPool(driver, pool, newvol, VIR_STORAGE_VOL_JOB_BUILD) < 0)
+        goto cleanup;
 
-    if (origpool) {
-        origpool->asyncjobs++;
-        virStoragePoolObjUnlock(origpool);
-    }
+    if ((origpool && storageBeginJobWithPool(driver, origpool, origvol, VIR_STORAGE_VOL_JOB_BUILD) < 0) ||
+        (!origpool && storageBeginJob(driver, origvol, VIR_STORAGE_VOL_JOB_BUILD) < 0))
+        goto endjob;
 
     buildret = backend->buildVolFrom(obj->conn, pool, newvol, origvol, flags);
 
-    storageDriverLock(driver);
-    virStoragePoolObjLock(pool);
-    if (origpool)
-        virStoragePoolObjLock(origpool);
-    storageDriverUnlock(driver);
-
-    origvol->building = 0;
-    newvol->building = 0;
     newvol = NULL;
-    pool->asyncjobs--;
 
-    if (origpool) {
-        origpool->asyncjobs--;
-        virStoragePoolObjUnlock(origpool);
-        origpool = NULL;
-    }
+    if (origpool)
+        storageEndJobWithPool(driver, origpool, origvol);
+    else
+        storageEndJob(driver, origvol);
+endjob:
+    storageEndJobWithPool(driver, pool, newvol);
 
     if (buildret < 0) {
         virStoragePoolObjUnlock(pool);
@@ -1569,6 +1666,23 @@ cleanup:
     return ret;
 }
 
+struct storageVolumeStreamData {
+    virStorageDriverStatePtr driver;
+    virStorageVolDefPtr vol;
+};
+
+static void
+storageVolumeStreamEndJob(virStreamPtr stream ATTRIBUTE_UNUSED,
+                          void *opaque)
+{
+    struct storageVolumeStreamData *data = opaque;
+    virStorageDriverStatePtr driver = data->driver;
+    virStorageVolDefPtr vol = data->vol;
+
+    VIR_FREE(data);
+
+    storageEndJob(driver, vol);
+}
 
 static int
 storageVolumeDownload(virStorageVolPtr obj,
@@ -1580,6 +1694,7 @@ storageVolumeDownload(virStorageVolPtr obj,
     virStorageDriverStatePtr driver = obj->conn->storagePrivateData;
     virStoragePoolObjPtr pool = NULL;
     virStorageVolDefPtr vol = NULL;
+    struct storageVolumeStreamData *stream_data = NULL;
     int ret = -1;
 
     virCheckFlags(0, -1);
@@ -1609,21 +1724,34 @@ storageVolumeDownload(virStorageVolPtr obj,
         goto out;
     }
 
-    if (vol->building) {
-        virStorageReportError(VIR_ERR_OPERATION_INVALID,
-                              _("volume '%s' is still being allocated."),
-                              vol->name);
+    if (storageBeginJob(driver, vol, VIR_STORAGE_VOL_JOB_DOWNLOAD) < 0)
         goto out;
-    }
 
     if (virFDStreamOpenFile(stream,
                             vol->target.path,
                             offset, length,
                             O_RDONLY) < 0)
-        goto out;
+        goto endjob;
+
+    if (VIR_ALLOC(stream_data) < 0) {
+        virReportOOMError();
+        goto endjob;
+    }
+
+    stream_data->driver = driver;
+    stream_data->vol = vol;
+
+    if (virFDStreamSetInternalCloseCb(stream,
+                                      storageVolumeStreamEndJob,
+                                      stream_data, NULL) < 0)
+        goto endjob;
 
     ret = 0;
+    goto out;
 
+endjob:
+    storageEndJob(driver, vol);
+    VIR_FREE(stream_data);
 out:
     if (pool)
         virStoragePoolObjUnlock(pool);
@@ -1642,6 +1770,7 @@ storageVolumeUpload(virStorageVolPtr obj,
     virStorageDriverStatePtr driver = obj->conn->storagePrivateData;
     virStoragePoolObjPtr pool = NULL;
     virStorageVolDefPtr vol = NULL;
+    struct storageVolumeStreamData *stream_data = NULL;
     int ret = -1;
 
     virCheckFlags(0, -1);
@@ -1671,12 +1800,8 @@ storageVolumeUpload(virStorageVolPtr obj,
         goto out;
     }
 
-    if (vol->building) {
-        virStorageReportError(VIR_ERR_OPERATION_INVALID,
-                              _("volume '%s' is still being allocated."),
-                              vol->name);
+    if (storageBeginJob(driver, vol, VIR_STORAGE_VOL_JOB_DOWNLOAD) < 0)
         goto out;
-    }
 
     /* Not using O_CREAT because the file is required to
      * already exist at this point */
@@ -1684,10 +1809,27 @@ storageVolumeUpload(virStorageVolPtr obj,
                             vol->target.path,
                             offset, length,
                             O_WRONLY) < 0)
-        goto out;
+        goto endjob;
+
+    if (VIR_ALLOC(stream_data) < 0) {
+        virReportOOMError();
+        goto endjob;
+    }
+
+    stream_data->driver = driver;
+    stream_data->vol = vol;
+
+    if (virFDStreamSetInternalCloseCb(stream,
+                                      storageVolumeStreamEndJob,
+                                      stream_data, NULL) < 0)
+        goto endjob;
 
     ret = 0;
+    goto out;
 
+endjob:
+    storageEndJob(driver, vol);
+    VIR_FREE(stream_data);
 out:
     if (pool)
         virStoragePoolObjUnlock(pool);
@@ -1737,13 +1879,6 @@ storageVolumeResize(virStorageVolPtr obj,
         goto out;
     }
 
-    if (vol->building) {
-        virStorageReportError(VIR_ERR_OPERATION_INVALID,
-                              _("volume '%s' is still being allocated."),
-                              vol->name);
-        goto out;
-    }
-
     if (flags & VIR_STORAGE_VOL_RESIZE_DELTA) {
         abs_capacity = vol->capacity + capacity;
         flags &= ~VIR_STORAGE_VOL_RESIZE_DELTA;
@@ -1771,9 +1906,14 @@ storageVolumeResize(virStorageVolPtr obj,
         goto out;
     }
 
+    if (storageBeginJobWithPool(driver, pool, vol, VIR_STORAGE_VOL_JOB_RESIZE) < 0)
+        goto out;
+
     if (backend->resizeVol(obj->conn, pool, vol, abs_capacity, flags) < 0)
         goto out;
 
+    storageEndJobWithPool(driver, pool, vol);
+
    vol->capacity = abs_capacity;
    ret = 0;
 
@@ -2028,17 +2168,16 @@ storageVolumeWipePattern(virStorageVolPtr obj,
         goto out;
     }
 
-    if (vol->building) {
-        virStorageReportError(VIR_ERR_OPERATION_INVALID,
-                              _("volume '%s' is still being allocated."),
-                              vol->name);
+    if (storageBeginJobWithPool(driver, pool, vol, VIR_STORAGE_VOL_JOB_WIPE) < 0)
         goto out;
-    }
 
     if (storageVolumeWipeInternal(vol, algorithm) == -1) {
+        storageEndJobWithPool(driver, pool, vol);
         goto out;
     }
 
+    storageEndJobWithPool(driver, pool, vol);
+
     ret = 0;
 
 out:
@@ -2066,6 +2205,7 @@ storageVolumeDelete(virStorageVolPtr obj,
     virStorageVolDefPtr vol = NULL;
     unsigned int i;
     int ret = -1;
+    int deleteVolRet;
 
     storageDriverLock(driver);
     pool = virStoragePoolObjFindByName(&driver->pools, obj->pool);
@@ -2095,13 +2235,6 @@ storageVolumeDelete(virStorageVolPtr obj,
         goto cleanup;
     }
 
-    if (vol->building) {
-        virStorageReportError(VIR_ERR_OPERATION_INVALID,
-                              _("volume '%s' is still being allocated."),
-                              vol->name);
-        goto cleanup;
-    }
-
     if (!backend->deleteVol) {
         virStorageReportError(VIR_ERR_NO_SUPPORT,
                               "%s", _("storage pool does not support vol deletion"));
@@ -2109,7 +2242,14 @@ storageVolumeDelete(virStorageVolPtr obj,
         goto cleanup;
     }
 
-    if (backend->deleteVol(obj->conn, pool, vol, flags) < 0)
+    if (storageBeginJobWithPool(driver, pool, vol,
+                                VIR_STORAGE_VOL_JOB_DELETE) < 0)
+        goto cleanup;
+
+    deleteVolRet = backend->deleteVol(obj->conn, pool, vol, flags);
+
+    storageEndJobWithPool(driver, pool, vol);
+    if (deleteVolRet < 0)
         goto cleanup;
 
     for (i = 0 ; i < pool->volumes.count ; i++) {
-- 
1.7.8.5


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]