[Ovirt-devel] [PATCH server] Taskomatic Refactoring and Qpidification Take 2

Ian Main imain at redhat.com
Fri Dec 19 06:08:39 UTC 2008


This is a repost of the new patch in its entirety including a number
of new bugfixes from this evening.  If most folks think it should go
to 2 spaces let me know, I'm not really opposed to that.

This patch reworks taskomatic quite a bit.  This mostly just shifts
taskomatic to using the qpid interface in place of ruby-libvirt.  It
also fixes a few bugs I discovered a long the way and adds new ones
I'm sure.  The only other thing added was round-robin host selection
for VMs.

Wherevery possible the hosts are queried directly using qpid rather than
relying on states from the database.

This patch loses about 150 lines from the original taskomatic and moves
most of the task implementation into a central class.  This was done to
provide access to the qpid session as well as providing for locking/task
ordering in future versions.

This requires the latest libvirt-qpid (0.2.8) as it fixes a number of
bugs.  It's in the ovirt repository now.

Issues remaining:

- libvirt-qpid migrate is broken.  Since the migrate takes place on the
  node instead of from the ovirt-appliance, the source node doesn't have
  the ability to authenticate against the destination node.  For this
  reason I'm still using ruby-libvirt migrate.  I talked to Chris about
  this and we have a plan worked out. :)

- I wanted to get threading into this but that will have to wait.  I'll
  post a thread about this to get the discussion started again.  I think
  the refactoring allows this to be put in pretty easily.

Signed-off-by: Ian Main <imain at redhat.com>
---
 src/task-omatic/task_host.rb    |   33 --
 src/task-omatic/task_storage.rb |  438 ++++++++-----------
 src/task-omatic/task_vm.rb      |  708 ++++---------------------------
 src/task-omatic/taskomatic.rb   |  915 ++++++++++++++++++++++++++++++++++-----
 src/task-omatic/utils.rb        |  221 ----------
 5 files changed, 1065 insertions(+), 1250 deletions(-)
 delete mode 100644 src/task-omatic/task_host.rb
 delete mode 100644 src/task-omatic/utils.rb

diff --git a/src/task-omatic/task_host.rb b/src/task-omatic/task_host.rb
deleted file mode 100644
index 3d039fb..0000000
--- a/src/task-omatic/task_host.rb
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright (C) 2008 Red Hat, Inc.
-# Written by Chris Lalancette <clalance at redhat.com>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; version 2 of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
-# MA  02110-1301, USA.  A copy of the GNU General Public License is
-# also available at http://www.gnu.org/copyleft/gpl.html.
-
-require 'utils'
-
-# FIXME: a little ugly to be including all of task_vm here, but
-# utils really isn't the right place for the migrate() method
-require 'task_vm'
-
-def clear_vms_host(task)
-  puts "clear_vms_host"
-
-  src_host = task.host
-
-  src_host.vms.each do |vm|
-    migrate(vm)
-  end
-end
diff --git a/src/task-omatic/task_storage.rb b/src/task-omatic/task_storage.rb
index 19800fb..7167f52 100644
--- a/src/task-omatic/task_storage.rb
+++ b/src/task-omatic/task_storage.rb
@@ -16,287 +16,227 @@
 # MA  02110-1301, USA.  A copy of the GNU General Public License is
 # also available at http://www.gnu.org/copyleft/gpl.html.
 
-require 'utils'
-
 require 'libvirt'
+require 'rexml/document'
+include REXML
 
-def add_volumes_to_db(db_pool, libvirt_pool, owner = nil, group = nil, mode = nil)
-  # FIXME: this is currently broken if you do something like:
-  # 1.  Add an iscsi pool with 3 volumes (lun-1, lun-2, lun-3)
-  # 2.  Scan it in
-  # 3.  Remove lun-3 from the pool
-  # 4.  Re-scan it
-  # What will happen is that you will still have lun-3 available in the
-  # database, even though it's not available in the pool anymore.  It's a
-  # little tricky, though; we have to make sure that we don't pull the
-  # database entry out from underneath a possibly running VM (or do we?)
-  libvirt_pool.list_volumes.each do |volname|
-    storage_volume = StorageVolume.factory(db_pool.get_type_label)
-
-    # NOTE: it is safe (and, in fact, necessary) to use
-    # #{storage_volume.volume_name} here without sanitizing it.  This is
-    # because this is *not* based on user modifiable data, but rather, on an
-    # internal implementation detail
-    existing_vol = StorageVolume.find(:first, :conditions =>
-                                      [ "storage_pool_id = ? AND #{storage_volume.volume_name} = ?",
-                                        db_pool.id, volname])
-    if existing_vol != nil
-      # in this case, this path already exists in the database; just skip
-      next
-    end
+def String.random_alphanumeric(size=16)
+    s = ""
+    size.times { s << (i = Kernel.rand(62); i += ((i < 10) ? 48 : ((i < 36) ? 55 : 61 ))).chr }
+    s
+end
 
-    volptr = libvirt_pool.lookup_vol_by_name(volname)
-
-    volinfo = volptr.info
-
-    storage_volume = StorageVolume.factory(db_pool.get_type_label)
-    storage_volume.path = volptr.path
-    storage_volume.size = volinfo.capacity / 1024
-    storage_volume.storage_pool_id = db_pool.id
-    storage_volume.write_attribute(storage_volume.volume_name, volname)
-    storage_volume.lv_owner_perms = owner
-    storage_volume.lv_group_perms = group
-    storage_volume.lv_mode_perms = mode
-    storage_volume.state = StorageVolume::STATE_AVAILABLE
-    storage_volume.save!
-  end
+def get_libvirt_lvm_pool_from_volume(db_volume)
+    phys_volume = StorageVolume.find(:first, :conditions =>
+                                     ["lvm_pool_id = ?", db_volume.storage_pool_id])
+
+    return LibvirtPool.factory(phys_volume.storage_pool)
 end
 
-def storage_find_suitable_host(hardware_pool)
-  conn = nil
-  hardware_pool.hosts.each do |host|
-    if not host.is_disabled.nil? and host.is_disabled == 0 \
-      and host.state == Host::STATE_AVAILABLE
-      begin
-        # FIXME: this can hang up taskomatic for quite some time.  To see how,
-        # make one of your remote servers do "iptables -I INPUT -j DROP"
-        # and then try to run this; it will take TCP quite a while to give up.
-        # Unfortunately the solution is probably to do some sort of threading
-        conn = Libvirt::open("qemu+tcp://" + host.hostname + "/system")
-
-        # if we didn't raise an exception, we connected; get out of here
-        break
-      rescue Libvirt::ConnectionError
-        # if we couldn't connect for whatever reason, just try the next host
-        next
-      end
-    end
-  end
-
-  if conn == nil
-    # last ditch effort; if we didn't find any hosts, just use ourselves.
-    # this may or may not work
-    begin
-      conn = Libvirt::open("qemu:///system")
-    rescue
-    end
-  end
+class LibvirtPool
 
-  if conn == nil
-    raise "Could not find a host to scan storage"
-  end
+    attr_reader :remote_pool
 
-  return conn
-end
+    def initialize(type, name = nil)
+        @remote_pool = nil
+        @build_on_start = true
+        @remote_pool_defined = false
+        @remote_pool_started = false
 
-# The words "pool" and "volume" are ridiculously overloaded in our context.
-# Therefore, the refresh_pool method adopts this convention:
-# phys_db_pool: The underlying physical storage pool, as it is represented in
-#               the database
-# phys_libvirt_pool: The underlying physical storage, as it is represented in
-#                    libvirt
-# lvm_db_pool: The logical storage pool (if it exists), as it is represented
-#              in the database
-# lvm_libvirt_pool: The logical storage pool (if it exists), as it is
-#                   represented in the database
-
-def refresh_pool(task)
-  puts "refresh_pool"
-
-  phys_db_pool = task.storage_pool
-  if phys_db_pool == nil
-    raise "Could not find storage pool"
-  end
-
-  conn = storage_find_suitable_host(phys_db_pool.hardware_pool)
-
-  begin
-    phys_libvirt_pool = LibvirtPool.factory(phys_db_pool)
-    phys_libvirt_pool.connect(conn)
-
-    begin
-      # OK, the pool is all set.  Add in all of the volumes
-      add_volumes_to_db(phys_db_pool, phys_libvirt_pool)
-
-      phys_db_pool.state = StoragePool::STATE_AVAILABLE
-      phys_db_pool.save!
-
-      # OK, now we've scanned the underlying hardware pool and added the
-      # volumes.  Next we scan for pre-existing LVM volumes
-      logical_xml = conn.discover_storage_pool_sources("logical")
-
-      Document.new(logical_xml).elements.each('sources/source') do |source|
-        vgname = source.elements["name"].text
-
-        begin
-          source.elements.each("device") do |device|
-            byid_device = phys_libvirt_pool.lookup_vol_by_path(device.attributes["path"]).path
-          end
-        rescue
-          # If matching any of the <device> sections in the LVM XML fails
-          # against the storage pool, then it is likely that this is a storage
-          # pool not associated with the one we connected above.  Go on
-          # FIXME: it would be nicer to catch the right exception here, and
-          # fail on other exceptions
-          puts "One of the logical volumes in #{vgname} is not part of the pool of type #{phys_db_pool[:type]} that we are scanning; ignore the previous error!"
-          next
+        if name == nil
+            @name = type + "-" + String.random_alphanumeric
+        else
+            @name = name
         end
 
-        # if we make it here, then we were able to resolve all of the devices,
-        # so we know we need to use a new pool
-        lvm_db_pool = LvmStoragePool.find(:first, :conditions =>
-                                          [ "vg_name = ?", vgname ])
-        if lvm_db_pool == nil
-          lvm_db_pool = LvmStoragePool.new
-          lvm_db_pool[:type] = "LvmStoragePool"
-          # set the LVM pool to the same hardware pool as the underlying storage
-          lvm_db_pool.hardware_pool_id = phys_db_pool.hardware_pool_id
-          lvm_db_pool.vg_name = vgname
-          lvm_db_pool.save!
+        @xml = Document.new
+        @xml.add_element("pool", {"type" => type})
+
+        @xml.root.add_element("name").add_text(@name)
+
+        @xml.root.add_element("source")
+
+        @xml.root.add_element("target")
+        @xml.root.elements["target"].add_element("path")
+    end
+
+    def connect(session, node)
+        pools = session.objects(:class => 'pool', 'node' => node.object_id)
+        pools.each do |pool|
+            result = pool.getXMLDesc
+            raise "Error getting xml description of pool: #{result.text}" unless result.status == 0
+
+            xml_desc = result.description
+            if self.xmlequal?(Document.new(xml_desc).root)
+                @remote_pool = pool
+                break
+            end
         end
 
-        source.elements.each("device") do |device|
-          byid_device = phys_libvirt_pool.lookup_vol_by_path(device.attributes["path"]).path
-          physical_vol = StorageVolume.find(:first, :conditions =>
-                                            [ "path = ?",  byid_device])
-          if physical_vol == nil
-            # Hm. We didn't find the device in the storage volumes already.
-            # something went wrong internally, and we have to bail
-            raise "Storage internal physical volume error"
-          end
-
-          # OK, put the right lvm_pool_id in place
-          physical_vol.lvm_pool_id = lvm_db_pool.id
-          physical_vol.save!
+        #XXX: I'm not sure.. it seems like there could be other things going on
+        #     with the storage pool state.  State can be inactive, building, running
+        #     or degraded.  I think some more thought should go here to make sure
+        #     we're doing things right in each state.
+        if @remote_pool == nil
+            result = node.storagePoolDefineXML(@xml.to_s)
+            raise "Error creating pool: #{result.text}" unless result.status == 0
+            @remote_pool = session.object(:object_id => result.pool)
+            raise "Error finding newly created remote pool." unless @remote_pool
+
+            # we need this because we don't want to "build" LVM pools, which would
+            # destroy existing data
+            if @build_on_start
+                result = @remote_pool.build
+                raise "Error building pool: #{result.text}" unless result.status == 0
+            end
+            @remote_pool_defined = true
         end
 
-        lvm_libvirt_pool = LibvirtPool.factory(lvm_db_pool)
-        lvm_libvirt_pool.connect(conn)
+        if @remote_pool.state == "inactive"
+            # only try to start the pool if it is currently inactive; in all other
+            # states, assume it is already running
+            result = @remote_pool.create
+            raise "Error creating pool: #{result.text}" unless result.status == 0
 
-        begin
-          add_volumes_to_db(lvm_db_pool, lvm_libvirt_pool, "0744", "0744", "0744")
-        ensure
-          lvm_libvirt_pool.shutdown
+            # Refresh qpid object with new properties.
+            @remote_pool.update
+
+            @remote_pool_started = true
+        end
+    end
+
+    def create_vol(type, name, size, owner, group, mode)
+        @vol_xml = Document.new
+        @vol_xml.add_element("volume", {"type" => type})
+        @vol_xml.root.add_element("name").add_text(name)
+        @vol_xml.root.add_element("capacity", {"unit" => "K"}).add_text(size.to_s)
+        @vol_xml.root.add_element("target")
+        @vol_xml.root.elements["target"].add_element("permissions")
+        @vol_xml.root.elements["target"].elements["permissions"].add_element("owner").add_text(owner)
+        @vol_xml.root.elements["target"].elements["permissions"].add_element("group").add_text(group)
+        @vol_xml.root.elements["target"].elements["permissions"].add_element("mode").add_text(mode)
+    end
+
+    def shutdown
+        if @remote_pool_started
+            result = @remote_pool.destroy
+        end
+        if @remote_pool_defined
+            result = @remote_pool.undefine
         end
-      end
-    ensure
-      phys_libvirt_pool.shutdown
     end
-  ensure
-    conn.close
-  end
-end
 
-def create_volume(task)
-  puts "create_volume"
+    def xmlequal?(docroot)
+        return false
+    end
 
-  db_volume = task.storage_volume
-  if db_volume == nil
-    raise "Could not find storage volume to create"
-  end
+    def self.factory(pool)
+        if pool[:type] == "IscsiStoragePool"
+            return IscsiLibvirtPool.new(pool.ip_addr, pool[:target])
+        elsif pool[:type] == "NfsStoragePool"
+            return NFSLibvirtPool.new(pool.ip_addr, pool.export_path)
+        elsif pool[:type] == "LvmStoragePool"
+            # OK, if this is LVM storage, there are two cases we need to care about:
+            # 1) this is a LUN with LVM already on it.  In this case, all we need to
+            #    do is to create a new LV (== libvirt volume), and be done with it
+            # 2) this LUN is blank, so there is no LVM on it already.  In this
+            #    case, we need to pvcreate, vgcreate first (== libvirt pool build),
+            #    and *then* create the new LV (== libvirt volume) on top of that.
+            #
+            # We can tell the difference between an LVM Pool that exists and one
+            # that needs to be created based on the value of the pool.state;
+            # if it is PENDING_SETUP, we need to create it first
+            phys_volume = StorageVolume.find(:first, :conditions =>
+                                             [ "lvm_pool_id = ?", pool.id])
+            return LVMLibvirtPool.new(pool.vg_name, phys_volume.path,
+                                      pool.state == StoragePool::STATE_PENDING_SETUP)
+        else
+            raise "Unknown storage pool type " + pool[:type].to_s
+        end
+    end
+end
+
+class IscsiLibvirtPool < LibvirtPool
+    def initialize(ip_addr, target)
+        super('iscsi')
 
-  db_pool = db_volume.storage_pool
-  if db_pool == nil
-    raise "Could not find storage pool"
-  end
+        @type = 'iscsi'
+        @ipaddr = ip_addr
+        @target = target
 
-  conn = storage_find_suitable_host(db_pool.hardware_pool)
+        @xml.root.elements["source"].add_element("host", {"name" => @ipaddr})
+        @xml.root.elements["source"].add_element("device", {"path" => @target})
 
-  begin
-    if db_volume[:type] == "LvmStorageVolume"
-      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
-      phys_libvirt_pool.connect(conn)
+        @xml.root.elements["target"].elements["path"].text = "/dev/disk/by-id"
     end
 
-    begin
-      libvirt_pool = LibvirtPool.factory(db_pool)
-
-      begin
-        libvirt_pool.connect(conn)
-
-        libvirt_pool.create_vol(*db_volume.volume_create_params)
-        db_volume.state = StorageVolume::STATE_AVAILABLE
-        db_volume.save!
-
-        db_pool.state = StoragePool::STATE_AVAILABLE
-        db_pool.save!
-      ensure
-        libvirt_pool.shutdown
-      end
-    ensure
-      if db_volume[:type] == "LvmStorageVolume"
-        phys_libvirt_pool.shutdown
-      end
+    def xmlequal?(docroot)
+        return (docroot.attributes['type'] == @type and
+                docroot.elements['source'].elements['host'].attributes['name'] == @ipaddr and
+                docroot.elements['source'].elements['device'].attributes['path'] == @target)
     end
-  ensure
-    conn.close
-  end
 end
 
-def delete_volume(task)
-  puts "delete_volume"
+class NFSLibvirtPool < LibvirtPool
+    def initialize(ip_addr, export_path)
+        super('netfs')
+
+        @type = 'netfs'
+        @host = ip_addr
+        @remote_path = export_path
+        @name = String.random_alphanumeric
+
+        @xml.root.elements["source"].add_element("host", {"name" => @host})
+        @xml.root.elements["source"].add_element("dir", {"path" => @remote_path})
+        @xml.root.elements["source"].add_element("format", {"type" => "nfs"})
 
-  db_volume = task.storage_volume
-  if db_volume == nil
-    raise "Could not find storage volume to create"
-  end
+        @xml.root.elements["target"].elements["path"].text = "/mnt/" + @name
+    end
+
+    def create_vol(name, size, owner, group, mode)
+        # FIXME: this can actually take some time to complete (since we aren't
+        # doing sparse allocations at the moment).  During that time, whichever
+        # libvirtd we chose to use is completely hung up.  The solution is 3-fold:
+        # 1.  Allow sparse allocations in the WUI front-end
+        # 2.  Make libvirtd multi-threaded
+        # 3.  Make taskomatic multi-threaded
+        super("netfs", name, size, owner, group, mode)
+
+        # FIXME: we have to add the format as raw here because of a bug in libvirt;
+        # if you specify a volume with no format, it will crash libvirtd
+        @vol_xml.root.elements["target"].add_element("format", {"type" => "raw"})
+        @remote_pool.createVolumeXML(@vol_xml.to_s)
+    end
 
-  db_pool = db_volume.storage_pool
-  if db_pool == nil
-    raise "Could not find storage pool"
-  end
+    def xmlequal?(docroot)
+        return (docroot.attributes['type'] == @type and
+                docroot.elements['source'].elements['host'].attributes['name'] == @host and
+                docroot.elements['source'].elements['dir'].attributes['path'] == @remote_path)
+    end
+end
 
-  conn = storage_find_suitable_host(db_pool.hardware_pool)
+class LVMLibvirtPool < LibvirtPool
+    def initialize(vg_name, device, build_on_start)
+        super('logical', vg_name)
 
-  begin
-    if db_volume[:type] == "LvmStorageVolume"
-      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
-      phys_libvirt_pool.connect(conn)
+        @type = 'logical'
+        @build_on_start = build_on_start
+
+        @xml.root.elements["source"].add_element("name").add_text(@name)
+        @xml.root.elements["source"].add_element("device", {"path" => device})
+        @xml.root.elements["target"].elements["path"].text = "/dev/" + @name
+    end
+
+    def create_vol(name, size, owner, group, mode)
+        super("logical", name, size, owner, group, mode)
+        @remote_pool.createVolumeXML(@vol_xml.to_s)
     end
 
-    begin
-      libvirt_pool = LibvirtPool.factory(db_pool)
-      libvirt_pool.connect(conn)
-
-      begin
-        libvirt_volume = libvirt_pool.lookup_vol_by_name(db_volume.read_attribute(db_volume.volume_name))
-        # FIXME: we actually probably want to zero out the whole volume here, so
-        # we aren't potentially leaking data from one user to another.  There
-        # are two problems, though:
-        # 1)  I'm not sure how I would go about zero'ing the data on a remote
-        # machine, since there is no "libvirt_write_data" call
-        # 2)  This could potentially take quite a while, so we want to spawn
-        # off another thread to do it
-        libvirt_volume.delete
-
-        # Note: we have to nil out the task_target because when we delete the
-        # volume object, that also deletes all dependent tasks (including this
-        # one), which leads to accessing stale tasks.  Orphan the task, then
-        # delete the object; we can clean up orphans later (or not, depending
-        # on the audit policy)
-        task.task_target = nil
-        task.save!
-
-        db_volume.destroy
-      ensure
-        libvirt_pool.shutdown
-      end
-    ensure
-      if db_volume[:type] == "LvmStorageVolume"
-        phys_libvirt_pool.shutdown
-      end
+    def xmlequal?(docroot)
+        return (docroot.attributes['type'] == @type and
+                docroot.elements['name'].text == @name and
+                docroot.elements['source'].elements['name'] and
+                docroot.elements['source'].elements['name'].text == @name)
     end
-  ensure
-    conn.close
-  end
 end
+
diff --git a/src/task-omatic/task_vm.rb b/src/task-omatic/task_vm.rb
index c187287..ae44a90 100644
--- a/src/task-omatic/task_vm.rb
+++ b/src/task-omatic/task_vm.rb
@@ -19,684 +19,124 @@
 require 'rexml/document'
 include REXML
 
-require 'utils'
-
 gem 'cobbler'
 require 'cobbler'
 
-def findHostSLA(vm)
-  host = nil
-
-  vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr|
-    # FIXME: we probably need to add in some notion of "load" into this check
-    if curr.num_cpus >= vm.num_vcpus_allocated \
-      and curr.memory >= vm.memory_allocated \
-      and not curr.is_disabled.nil? and curr.is_disabled == 0 \
-      and curr.state == Host::STATE_AVAILABLE \
-      and (vm.host_id.nil? or (not vm.host_id.nil? and vm.host_id != curr.id))
-      host = curr
-      break
-    end
-  end
-
-  if host == nil
-    # we couldn't find a host that matches this criteria
-    raise "No host matching VM parameters could be found"
-  end
-
-  return host
-end
-
-def findHost(host_id)
-  host = Host.find(:first, :conditions => [ "id = ?", host_id])
-
-  if host == nil
-    # Hm, we didn't find the host_id.  Seems odd.  Return a failure
-    raise "Could not find host_id " + host_id.to_s
-  end
+def find_host(host_id)
+    host = Host.find(:first, :conditions => [ "id = ?", host_id])
 
-  return host
-end
-
-def connect_storage_pools(conn, storage_volumes)
-  storagedevs = []
-  storage_volumes.each do |volume|
-    # here, we need to iterate through each volume and possibly attach it
-    # to the host we are going to be using
-    db_pool = volume.storage_pool
-    if db_pool == nil
-      # Hum.  Specified by the VM description, but not in the storage pool?
-      # continue on and hope for the best
-      puts "Couldn't find pool for volume #{volume.path}; skipping"
-      next
-    end
-
-    # we have to special case LVM pools.  In that case, we need to first
-    # activate the underlying physical device, and then do the logical one
-    if volume[:type] == "LvmStorageVolume"
-      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(volume)
-      phys_libvirt_pool.connect(conn)
+    if host == nil
+        # Hm, we didn't find the host_id.  Seems odd.  Return a failure
+        raise "Could not find host_id " + host_id.to_s
     end
 
-    libvirt_pool = LibvirtPool.factory(db_pool)
-    libvirt_pool.connect(conn)
-
-    # OK, the pool should be all set.  The last thing we need to do is get
-    # the path based on the volume name
-    storagedevs << libvirt_pool.lookup_vol_by_name(volume.read_attribute(volume.volume_name)).path
-  end
-
-  return storagedevs
+    return host
 end
 
-def remove_pools(conn, type = nil)
-  all_storage_pools(conn).each do |remote_pool_name|
-    pool = conn.lookup_storage_pool_by_name(remote_pool_name)
-
-    if type == nil or type == Document.new(pool.xml_desc).root.attributes['type']
-      begin
-        pool.destroy
-      rescue
-      end
-
-      begin
-        # if the destroy failed, we still try to undefine; it may be a pool
-        # that was previously destroyed but not undefined for whatever reason
-        pool.undefine
-      rescue
-        # do nothing if any of this failed; the worst that happens is that
-        # we leave a pool configured
-        puts "Could not teardown pool " + remote_pool_name + "; skipping"
-      end
-    end
-  end
-end
-
-def teardown_storage_pools(conn)
-  # FIXME: this needs to get a *lot* smarter.  In particular, we want to make
-  # sure we can tear down unused pools even when there are other guests running
-  if conn.list_domains.empty?
-    # OK, there are no running guests on this host anymore.  We can teardown
-    # any storage pools that are there without fear
-
-    # we first have to tear-down LVM pools, because they might depend on the
-    # underlying physical pools
-    remove_pools(conn, "logical")
-
-    # now tear down the rest of the pools
-    remove_pools(conn)
-  end
-end
 
 def create_vm_xml(name, uuid, memAllocated, memUsed, vcpus, bootDevice,
                   macAddr, bridge, diskDevices)
-  doc = Document.new
-
-  doc.add_element("domain", {"type" => "kvm"})
-
-  doc.root.add_element("name").add_text(name)
-
-  doc.root.add_element("uuid").add_text(uuid)
-
-  doc.root.add_element("memory").add_text(memAllocated.to_s)
-
-  doc.root.add_element("currentMemory").add_text(memUsed.to_s)
-
-  doc.root.add_element("vcpu").add_text(vcpus.to_s)
-
-  doc.root.add_element("os")
-  doc.root.elements["os"].add_element("type").add_text("hvm")
-  doc.root.elements["os"].add_element("boot", {"dev" => bootDevice})
-
-  doc.root.add_element("clock", {"offset" => "utc"})
+    doc = Document.new
 
-  doc.root.add_element("on_poweroff").add_text("destroy")
+    doc.add_element("domain", {"type" => "kvm"})
 
-  doc.root.add_element("on_reboot").add_text("restart")
+    doc.root.add_element("name").add_text(name)
 
-  doc.root.add_element("on_crash").add_text("destroy")
+    doc.root.add_element("uuid").add_text(uuid)
 
-  doc.root.add_element("devices")
-  doc.root.elements["devices"].add_element("emulator").add_text("/usr/bin/qemu-kvm")
+    doc.root.add_element("memory").add_text(memAllocated.to_s)
 
-  devs = ['hda', 'hdb', 'hdc', 'hdd']
-  which_device = 0
-  diskDevices.each do |disk|
-    is_cdrom = (disk =~ /\.iso/) ? true : false
+    doc.root.add_element("currentMemory").add_text(memUsed.to_s)
 
-    diskdev = Element.new("disk")
-    diskdev.add_attribute("type", is_cdrom ? "file" : "block")
-    diskdev.add_attribute("device", is_cdrom ? "cdrom" : "disk")
+    doc.root.add_element("vcpu").add_text(vcpus.to_s)
 
-    if is_cdrom
-      diskdev.add_element("readonly")
-      diskdev.add_element("source", {"file" => disk})
-      diskdev.add_element("target", {"dev" => devs[which_device], "bus" => "ide"})
-    else
-      diskdev.add_element("source", {"dev" => disk})
-      diskdev.add_element("target", {"dev" => devs[which_device]})
-    end
-
-    doc.root.elements["devices"] << diskdev
-    which_device += 1
-  end
-
-  doc.root.elements["devices"].add_element("interface", {"type" => "bridge"})
-  doc.root.elements["devices"].elements["interface"].add_element("mac", {"address" => macAddr})
-  doc.root.elements["devices"].elements["interface"].add_element("source", {"bridge" => bridge})
-  doc.root.elements["devices"].add_element("input", {"type" => "mouse", "bus" => "ps2"})
-  doc.root.elements["devices"].add_element("graphics", {"type" => "vnc", "port" => "-1", "listen" => "0.0.0.0"})
-
-  serial = Element.new("serial")
-  serial.add_attribute("type", "pty")
-  serial.add_element("target", {"port" => "0"})
-  doc.root.elements["devices"] << serial
-
-  return doc
-end
-
-def setVmState(vm, state)
-  vm.state = state
-  vm.save!
-end
-
-def setVmVncPort(vm, domain)
-  doc = REXML::Document.new(domain.xml_desc)
-  attrib = REXML::XPath.match(doc, "//graphics/@port")
-  if not attrib.empty?:
-    vm.vnc_port = attrib.to_s.to_i
-  end
-  vm.save!
-end
-
-def findVM(task, fail_on_nil_host_id = true)
-  # find the matching VM in the vms table
-  vm = task.vm
-
-  if vm == nil
-    raise "VM not found for task " + task.id
-  end
-
-  if vm.host_id == nil && fail_on_nil_host_id
-    # in this case, we have no idea where the VM is.  How can we handle this
-    # gracefully?  We don't necessarily want to just set the VM state to off;
-    # if the machine does happen to be running somewhere and we set it to
-    # disabled here, and then start it again, we could corrupt the disk
-
-    # FIXME: the right thing to do here is probably to contact all of the
-    # hosts we know about and ensure that the domain isn't running; then we
-    # can mark it either as off (if we didn't find it), or mark the correct
-    # vm.host_id if we did.  However, if you have a large number of hosts
-    # out there, this could take a while.
-    raise "No host_id for VM " + vm.id.to_s
-  end
-
-  return vm
-end
-
-def setVmShutdown(vm)
-  vm.host_id = nil
-  vm.memory_used = nil
-  vm.num_vcpus_used = nil
-  vm.state = Vm::STATE_STOPPED
-  vm.needs_restart = nil
-  vm.vnc_port = nil
-  vm.save!
-end
+    doc.root.add_element("os")
+    doc.root.elements["os"].add_element("type").add_text("hvm")
+    doc.root.elements["os"].add_element("boot", {"dev" => bootDevice})
 
-def create_vm(task)
-  puts "create_vm"
+    doc.root.add_element("clock", {"offset" => "utc"})
 
-  vm = findVM(task, false)
+    doc.root.add_element("on_poweroff").add_text("destroy")
 
-  if vm.state != Vm::STATE_PENDING
-    raise "VM not pending"
-  end
-  setVmState(vm, Vm::STATE_CREATING)
-
-  # create cobbler system profile
-  begin
-    # FIXME: Presently the wui handles all cobbler system creation.
-    # This should be moved out of the wui into Taskomatic.  Specifically
-    # here, and in the edit_vm methods.
-
-    setVmState(vm, Vm::STATE_STOPPED)
-  rescue Exception => error
-    setVmState(vm, Vm::STATE_CREATE_FAILED)
-    raise "Unable to create system: #{error.message}"
-  end
-end
-
-def shut_or_destroy_vm(task, which)
-  # here, we are given an id for a VM to shutdown; we have to lookup which
-  # physical host it is running on
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_STOPPED
-    # the VM is already shutdown; just return success
-    setVmShutdown(vm)
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot shutdown suspended domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot shutdown saved domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_STOPPING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.send(which)
-
-    begin
-      dom.undefine
-    rescue
-      # undefine can fail, for instance, if we live migrated from A -> B, and
-      # then we are shutting down the VM on B (because it only has "transient"
-      # XML).  Therefore, just ignore undefine errors so we do the rest
-      # FIXME: we really should have a marker in the database somehow so that
-      # we can tell if this domain was migrated; that way, we can tell the
-      # difference between a real undefine failure and one because of migration
-    end
-
-    teardown_storage_pools(conn)
-
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmShutdown(vm)
-end
-
-def shutdown_vm(task)
-  puts "shutdown_vm"
-  shut_or_destroy_vm(task, "shutdown")
-end
-
-def poweroff_vm(task)
-  puts "poweroff_vm"
-  shut_or_destroy_vm(task, "destroy")
-end
+    doc.root.add_element("on_reboot").add_text("restart")
 
-def start_vm(task)
-  puts "start_vm"
+    doc.root.add_element("on_crash").add_text("destroy")
 
-  # here, we are given an id for a VM to start
+    doc.root.add_element("devices")
+    doc.root.elements["devices"].add_element("emulator").add_text("/usr/bin/qemu-kvm")
 
-  vm = findVM(task, false)
+    devs = ['hda', 'hdb', 'hdc', 'hdd']
+    which_device = 0
+    diskDevices.each do |disk|
+        is_cdrom = (disk =~ /\.iso/) ? true : false
 
-  if vm.state == Vm::STATE_RUNNING
-    # the VM is already running; just return success
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot start suspended domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot start saved domain"
-  end
+        diskdev = Element.new("disk")
+        diskdev.add_attribute("type", is_cdrom ? "file" : "block")
+        diskdev.add_attribute("device", is_cdrom ? "cdrom" : "disk")
 
-  # FIXME: Validate that the VM is still within quota
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_STARTING)
-
-  begin
-    if vm.host_id != nil
-      # OK, marked in the database as already running on a host; for now, we
-      # will just fail the operation
-
-      # FIXME: we probably want to go out to the host it is marked on and check
-      # things out, just to make sure things are consistent
-      raise "VM already running"
-    end
-
-    # OK, now that we found the VM, go looking in the hardware_pool
-    # hosts to see if there is a host that will fit these constraints
-    host = findHostSLA(vm)
-
-    # if we're booting from a CDROM the VM is an image,
-    # then we need to add the NFS mount as a storage volume for this
-    # boot
-    #
-    if (vm.boot_device == Vm::BOOT_DEV_CDROM) && vm.uses_cobbler? && (vm.cobbler_type == Vm::IMAGE_PREFIX)
-      details = Cobbler::Image.find_one(vm.cobbler_name)
-
-      raise "Image #{vm.cobbler_name} not found in Cobbler server" unless details
-
-      # extract the components of the image filename
-      image_uri = details.file
-      protocol = auth = ip_addr = export_path = filename = ""
-
-      protocol, image_uri = image_uri.split("://") if image_uri.include?("://")
-      auth, image_uri = image_uri.split("@") if image_uri.include?("@")
-      # it's ugly, but string.split returns an empty string as the first
-      # result here, so we'll just ignore it
-      ignored, ip_addr, image_uri =
-	image_uri.split(/^([^\/]+)(\/.*)/) unless image_uri =~ /^\//
-      ignored, export_path, filename =
-	image_uri.split(/^(.*)\/(.+)/)
-
-      found = false
-
-      vm.storage_volumes.each do |volume|
-        if volume.filename == filename
-          if (volume.storage_pool.ip_addr == ip_addr) &&
-          (volume.storage_pool.export_path == export_path)
-            found = true
-          end
+        if is_cdrom
+            diskdev.add_element("readonly")
+            diskdev.add_element("source", {"file" => disk})
+            diskdev.add_element("target", {"dev" => devs[which_device], "bus" => "ide"})
+        else
+            diskdev.add_element("source", {"dev" => disk})
+            diskdev.add_element("target", {"dev" => devs[which_device]})
         end
-      end
-
-      unless found
-        # Create a new transient NFS storage volume
-        # This volume is *not* persisted.
-        image_volume = StorageVolume.factory("NFS",
-          :filename => filename
-        )
-
-        image_volume.storage_pool
-        image_pool = StoragePool.factory(StoragePool::NFS)
-
-        image_pool.ip_addr = ip_addr
-        image_pool.export_path = export_path
-        image_pool.storage_volumes << image_volume
-        image_volume.storage_pool = image_pool
-      end
-    end
 
-    volumes = []
-    volumes += vm.storage_volumes
-    volumes << image_volume if image_volume
-
-    conn = Libvirt::open("qemu+tcp://" + host.hostname + "/system")
-
-    begin
-      storagedevs = connect_storage_pools(conn, volumes)
-
-      dom = nil
-      begin
-        # FIXME: get rid of the hardcoded bridge
-        xml = create_vm_xml(vm.description, vm.uuid, vm.memory_allocated,
-                            vm.memory_used, vm.num_vcpus_allocated,
-                            vm.boot_device, vm.vnic_mac_addr, "ovirtbr0",
-                            storagedevs)
-        dom = conn.define_domain_xml(xml.to_s)
-        dom.create
-
-        setVmVncPort(vm, dom)
-      rescue
-        if dom != nil
-          dom.undefine
-        end
-        teardown_storage_pools(conn)
-        raise ex
-      end
-    ensure
-      conn.close
+        doc.root.elements["devices"] << diskdev
+        which_device += 1
     end
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  vm.host_id = host.id
-  vm.state = Vm::STATE_RUNNING
-  vm.memory_used = vm.memory_allocated
-  vm.num_vcpus_used = vm.num_vcpus_allocated
-  vm.boot_device = Vm::BOOT_DEV_HD
-  vm.save!
-end
-
-def save_vm(task)
-  puts "save_vm"
-
-  # here, we are given an id for a VM to suspend
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_SAVED
-    # the VM is already saved; just return success
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot save suspended domain"
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot save shutdown domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_SAVING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.save("/tmp/" + vm.uuid + ".save")
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
 
-  # note that we do *not* reset the host_id here, since we stored the saved
-  # vm state information locally.  restore_vm will pick it up from here
+    doc.root.elements["devices"].add_element("interface", {"type" => "bridge"})
+    doc.root.elements["devices"].elements["interface"].add_element("mac", {"address" => macAddr})
+    doc.root.elements["devices"].elements["interface"].add_element("source", {"bridge" => bridge})
+    doc.root.elements["devices"].add_element("input", {"type" => "mouse", "bus" => "ps2"})
+    doc.root.elements["devices"].add_element("graphics", {"type" => "vnc", "port" => "-1", "listen" => "0.0.0.0"})
 
-  # FIXME: it would be much nicer to be able to save the VM and remove the
-  # the host_id and undefine the XML; that way we could resume it on another
-  # host later.  This can be done once we have the storage APIs, but it will
-  # need more work
+    serial = Element.new("serial")
+    serial.add_attribute("type", "pty")
+    serial.add_element("target", {"port" => "0"})
+    doc.root.elements["devices"] << serial
 
-  setVmState(vm, Vm::STATE_SAVED)
+    return doc
 end
 
-def restore_vm(task)
-  puts "restore_vm"
-
-  # here, we are given an id for a VM to start
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_RUNNING
-    # the VM is already saved; just return success
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot restore suspended domain"
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot restore shutdown domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_RESTORING)
-
-  begin
-    # FIXME: we should probably go out to the host and check what it thinks
-    # the state is
-
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.restore
-
-    setVmVncPort(vm, dom)
-
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmState(vm, Vm::STATE_RUNNING)
-end
-
-def suspend_vm(task)
-  puts "suspend_vm"
-
-  # here, we are given an id for a VM to suspend; we have to lookup which
-  # physical host it is running on
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_SUSPENDED
-    # the VM is already suspended; just return success
-    return
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot suspend stopped domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot suspend saved domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_SUSPENDING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.suspend
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  # note that we do *not* reset the host_id here, since we just suspended the VM
-  # resume_vm will pick it up from here
-
-  setVmState(vm, Vm::STATE_SUSPENDED)
-end
-
-def resume_vm(task)
-  puts "resume_vm"
-
-  # here, we are given an id for a VM to start
-
-  vm = findVM(task)
-
-  # OK, marked in the database as already running on a host; let's check it
-
-  if vm.state == Vm::STATE_RUNNING
-    # the VM is already suspended; just return success
-    return
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot resume stopped domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot resume suspended domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_RESUMING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.resume
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmState(vm, Vm::STATE_RUNNING)
+def set_vm_state(vm, state)
+    vm.state = state
+    vm.save!
 end
 
-def update_state_vm(task)
-  puts "update_state_vm"
-
-  # NOTE: findVM() will only return a vm if all the host information is filled
-  # in.  So if a vm that we thought was stopped is running, this returns nil
-  # and we don't update any information about it.  The tricky part
-  # is that we're still not sure what to do in this case :).  - Ian
-  #
-  # Actually for migration it is necessary that it be able to update
-  # the host and state of the VM once it is migrated.
-  vm = findVM(task, false)
-  new_vm_state, host_id_str = task.args.split(",")
-  if (vm.host_id == nil) and host_id_str
-    vm.host_id = host_id_str.to_i
-  end
-
-
-  vm_effective_state = Vm::EFFECTIVE_STATE[vm.state]
-  task_effective_state = Vm::EFFECTIVE_STATE[new_vm_state]
-
-  if vm_effective_state != task_effective_state
-    vm.state = new_vm_state
-
-    if task_effective_state == Vm::STATE_STOPPED
-      setVmShutdown(vm)
+def set_vm_vnc_port(vm, xml_desc)
+    doc = REXML::Document.new(xml_desc)
+    attrib = REXML::XPath.match(doc, "//graphics/@port")
+    if not attrib.empty?:
+        vm.vnc_port = attrib.to_s.to_i
     end
     vm.save!
-    puts "Updated state to " + new_vm_state
-  end
 end
 
-def migrate(vm, dest = nil)
-  if vm.state == Vm::STATE_STOPPED
-    raise "Cannot migrate stopped domain"
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot migrate suspended domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot migrate saved domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_MIGRATING)
-
-  begin
-    src_host = findHost(vm.host_id)
-    unless dest.nil? or dest.empty?
-      if dest.to_i == vm.host_id
-        raise "Cannot migrate from host " + src_host.hostname + " to itself!"
-      end
-      dst_host = findHost(dest.to_i)
-    else
-      dst_host = findHostSLA(vm)
+def find_vm(task, fail_on_nil_host_id = true)
+    # find the matching VM in the vms table
+    vm = task.vm
+
+    if vm == nil
+        raise "VM #{task.vm} not found for task #{task.id}"
     end
 
-    src_conn = Libvirt::open("qemu+tcp://" + src_host.hostname + "/system")
-    dst_conn = Libvirt::open("qemu+tcp://" + dst_host.hostname + "/system")
-
-    connect_storage_pools(dst_conn, vm)
-
-    dom = src_conn.lookup_domain_by_uuid(vm.uuid)
-    dom.migrate(dst_conn, Libvirt::Domain::MIGRATE_LIVE)
-
-    # if we didn't raise an exception, then the migration was successful.  We
-    # still have a pointer to the now-shutdown domain on the source side, so
-    # undefine it
-    begin
-      dom.undefine
-    rescue
-      # undefine can fail, for instance, if we live migrated from A -> B, and
-      # then we are shutting down the VM on B (because it only has "transient"
-      # XML).  Therefore, just ignore undefine errors so we do the rest
-      # FIXME: we really should have a marker in the database somehow so that
-      # we can tell if this domain was migrated; that way, we can tell the
-      # difference between a real undefine failure and one because of migration
+    if vm.host_id == nil && fail_on_nil_host_id
+        raise "No host_id for VM " + vm.id.to_s
     end
 
-    teardown_storage_pools(src_conn)
-    dst_conn.close
-    src_conn.close
-  rescue => ex
-    # FIXME: ug.  We may have open connections that we need to close; not
-    # sure how to handle that
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmState(vm, Vm::STATE_RUNNING)
-  vm.host_id = dst_host.id
-  vm.save!
+    return vm
 end
 
-def migrate_vm(task)
-  puts "migrate_vm"
-
-  # here, we are given an id for a VM to migrate; we have to lookup which
-  # physical host it is running on
-
-  vm = findVM(task)
-
-  migrate(vm, task.args)
+def set_vm_shut_down(vm)
+    vm.host_id = nil
+    vm.memory_used = nil
+    vm.num_vcpus_used = nil
+    vm.state = Vm::STATE_STOPPED
+    vm.needs_restart = nil
+    vm.vnc_port = nil
+    vm.save!
 end
+
diff --git a/src/task-omatic/taskomatic.rb b/src/task-omatic/taskomatic.rb
index ce37058..de56a90 100755
--- a/src/task-omatic/taskomatic.rb
+++ b/src/task-omatic/taskomatic.rb
@@ -1,7 +1,7 @@
 #!/usr/bin/ruby
-# 
+#
 # Copyright (C) 2008 Red Hat, Inc.
-# Written by Chris Lalancette <clalance at redhat.com>
+# Written by Chris Lalancette <clalance at redhat.com> and Ian Main <imain at redhat.com>
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -22,122 +22,811 @@ $: << File.join(File.dirname(__FILE__), "../dutils")
 $: << File.join(File.dirname(__FILE__), ".")
 
 require 'rubygems'
+require "qpid"
+require 'monitor'
+require 'dutils'
 require 'optparse'
 require 'daemons'
 include Daemonize
 
-$logfile = '/var/log/ovirt-server/taskomatic.log'
-
-do_daemon = true
-sleeptime = 5
-opts = OptionParser.new do |opts|
-  opts.on("-h", "--help", "Print help message") do
-    puts opts
-    exit
-  end
-  opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
-    do_daemon = !n
-  end
-  opts.on("-s N", Integer, "--sleep", "Seconds to sleep between iterations (default is 5 seconds)") do |s|
-    sleeptime = s
-  end
-end
-begin
-  opts.parse!(ARGV)
-rescue OptionParser::InvalidOption
-  puts opts
-  exit
-end
+require 'task_vm'
+require 'task_storage'
 
-if do_daemon
-  daemonize
-  STDOUT.reopen $logfile, 'a'
-  STDERR.reopen STDOUT
-end
+class TaskOmatic
 
-begin
-  require 'dutils'
-rescue => ex
-  puts "dutils require failed! #{ex.class}: #{ex.message}"
-end
+    include MonitorMixin
 
-require 'task_vm'
-require 'task_storage'
-require 'task_host'
-
-loop do
-  tasks = Array.new
-  begin
-    tasks = Task.find(:all, :conditions => [ "state = ?", Task::STATE_QUEUED ])
-  rescue => ex
-    puts "1 #{ex.class}: #{ex.message}"
-    if Task.connected?
-      begin
-        ActiveRecord::Base.connection.reconnect!
-      rescue => norecon
-        puts "2 #{norecon.class}: #{norecon.message}"
-      end
-    else
-      begin
-        database_connect
-      rescue => ex
-        puts "3 #{ex.class}: #{ex.message}"
-      end
-    end
-  end
-  tasks.each do |task|
-    # make sure we get our credentials up-front
-    get_credentials
-
-    task.time_started = Time.now
-    task.state = Task::STATE_RUNNING
-    task.save!
-
-    state = Task::STATE_FINISHED
-    begin
-      case task.action
-      when VmTask::ACTION_CREATE_VM then create_vm(task)
-      when VmTask::ACTION_SHUTDOWN_VM then shutdown_vm(task)
-      when VmTask::ACTION_POWEROFF_VM then poweroff_vm(task)
-      when VmTask::ACTION_START_VM then start_vm(task)
-      when VmTask::ACTION_SUSPEND_VM then suspend_vm(task)
-      when VmTask::ACTION_RESUME_VM then resume_vm(task)
-      when VmTask::ACTION_SAVE_VM then save_vm(task)
-      when VmTask::ACTION_RESTORE_VM then restore_vm(task)
-      when VmTask::ACTION_UPDATE_STATE_VM then update_state_vm(task)
-      when VmTask::ACTION_MIGRATE_VM then migrate_vm(task)
-      when StorageTask::ACTION_REFRESH_POOL then refresh_pool(task)
-      when StorageVolumeTask::ACTION_CREATE_VOLUME then create_volume(task)
-      when StorageVolumeTask::ACTION_DELETE_VOLUME then delete_volume(task)
-      when HostTask::ACTION_CLEAR_VMS then clear_vms_host(task)
-      else
-        puts "unknown task " + task.action
-        state = Task::STATE_FAILED
-        task.message = "Unknown task type"
-      end
-    rescue => ex
-      puts "Task action processing failed: #{ex.class}: #{ex.message}"
-      puts ex.backtrace
-      state = Task::STATE_FAILED
-      task.message = ex.message
-    end
-
-    task.state = state
-    task.time_ended = Time.now
-    task.save!
-    puts "done"
-  end
-
-  # FIXME: here, we clean up "orphaned" tasks.  These are tasks that we had
-  # to orphan (set task_target to nil) because we were deleting the object they
-  # depended on.
-  Task.find(:all, :conditions => [ "task_target_id IS NULL and task_target_type IS NULL" ]).each do |task|
-    task.destroy
-  end
-  
-  # we could destroy credentials, but another process might be using them (in
-  # particular, host-browser).  Just leave them around, it shouldn't hurt
-  
-  STDOUT.flush
-  sleep sleeptime
+    $logfile = '/var/log/ovirt-server/taskomatic.log'
+
+    def initialize()
+        super()
+
+        @sleeptime = 5
+        @nth_host = 0
+
+        @session = Qpid::Qmf::Session.new()
+        # FIXME: Should come from some kind of config or DNS SRV or what have you.
+        @broker = @session.add_broker("amqp://localhost:5672")
+
+        do_daemon = true
+
+        opts = OptionParser.new do |opts|
+            opts.on("-h", "--help", "Print help message") do
+                puts opts
+                exit
+            end
+            opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
+                do_daemon = false
+            end
+            opts.on("-s N", Integer, "--sleep", "Seconds to sleep between iterations (default is 5 seconds)") do |s|
+                sleeptime = s
+            end
+        end
+        begin
+            opts.parse!(ARGV)
+        rescue OptionParser::InvalidOption
+            puts opts
+            exit
+        end
+
+        if do_daemon
+            # XXX: This gets around a problem with paths for the database stuff.
+            # Normally daemonize would chdir to / but the paths for the database
+            # stuff are relative so it breaks it.. It's either this or rearrange
+            # things so the db stuff is included after daemonizing.
+            pwd = Dir.pwd
+            daemonize
+            Dir.chdir(pwd)
+            STDOUT.reopen $logfile, 'a'
+            STDERR.reopen STDOUT
+        end
+    end
+
+    def find_capable_host(db_vm)
+        possible_hosts = []
+
+        vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+
+        db_vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr|
+            # Now each of 'curr' is in the right hardware pool.. now we check them out.
+
+            node = @session.object(:class => "node", 'hostname' => curr.hostname)
+            next unless node
+
+            # So now we expect if the node was found it's alive and well, then we check
+            # to make sure there's enough real cores for the number of vcpus, the node
+            # memory is adequate, the node is not disabled in the database, and if the
+            # node id is nil or if it is already running (has a node id set) then it
+            # is probably looking to migrate so we find a node that is not the current
+            # node.
+            #
+            # In the future we could add load or similar checks here.
+
+            #puts "checking node, #{node.cores} >= #{db_vm.num_vcpus_allocated},"
+            #puts "and #{node.memory} >= #{db_vm.memory_allocated}"
+            #puts "and not #{curr.is_disabled.nil?} and #{curr.is_disabled == 0}"
+            #puts "and #{vm ? vm : 'nil'} or #{vm ? vm.active : 'nil'}) or #{vm ? vm.node : 'nil'} != #{node.object_id}"
+
+            if node and node.cores >= db_vm.num_vcpus_allocated \
+               and node.memory >= db_vm.memory_allocated \
+               and not curr.is_disabled.nil? and curr.is_disabled == 0 \
+               and ((!vm or vm.active == 'false') or vm.node != node.object_id)
+                possible_hosts.push(curr)
+            end
+        end
+
+        #puts "possible_hosts.length = #{possible_hosts.length}"
+        if possible_hosts.length == 0
+            # we couldn't find a host that matches this criteria
+            raise "No host matching VM parameters could be found"
+        end
+
+        # XXX: Right now we're just picking the nth host, we could also look at
+        # how many vms are already on it, or the load of the hosts etc.
+        host = possible_hosts[@nth_host % possible_hosts.length]
+        @nth_host += 1
+
+        return host
+    end
+
+    def connect_storage_pools(node, storage_volumes)
+        storagedevs = []
+        storage_volumes.each do |db_volume|
+            # here, we need to iterate through each volume and possibly attach it
+            # to the host we are going to be using
+            db_pool = db_volume.storage_pool
+            if db_pool == nil
+                # Hum.  Specified by the VM description, but not in the storage pool?
+                # continue on and hope for the best
+                puts "Couldn't find pool for volume #{db_volume.path}; skipping"
+                next
+            end
+
+            # we have to special case LVM pools.  In that case, we need to first
+            # activate the underlying physical device, and then do the logical one
+            if db_volume[:type] == "LvmStorageVolume"
+                phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
+                phys_libvirt_pool.connect(@session, node)
+            end
+
+            libvirt_pool = LibvirtPool.factory(db_pool)
+            libvirt_pool.connect(@session, node)
+
+            # OK, the pool should be all set.  The last thing we need to do is get
+            # the path based on the volume name
+
+            volume_name = db_volume.read_attribute(db_volume.volume_name)
+            pool = libvirt_pool.remote_pool
+            volume = @session.object(:class => 'volume',
+                                     'name' => volume_name,
+                                     'storagePool' => pool.object_id)
+            raise "Unable to find volume #{volume_name} attached to pool #{pool.name}." unless volume
+            storagedevs << volume.path
+        end
+
+        return storagedevs
+    end
+
+    def task_create_vm(task)
+        # XXX: This is mostly just a place holder.
+        vm = find_vm(task, false)
+        if vm.state != Vm::STATE_PENDING
+            raise "VM not pending"
+        end
+        vm.state = Vm::STATE_STOPPED
+        vm.save!
+    end
+
+    def teardown_storage_pools(node)
+
+        # This is rather silly because we only destroy pools if there are no
+        # more vms on the node.  We should be reference counting the pools
+        # somehow so we know when they are no longer in use.
+        vms = @session.objects(:class => 'domain', 'node' => node.object_id)
+        if vms.length > 0
+            return
+        end
+        pools = @session.objects(:class => 'pool', 'node' => node.object_id)
+
+        # FIXME: I think we should be destroying/undefining logical volumes first.
+        pools.each do |pool|
+            result = pool.destroy
+            result = pool.undefine
+        end
+    end
+
+
+    def task_shutdown_or_destroy_vm(task, action)
+        db_vm = task.vm
+        vm = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+        if !vm
+            puts "VM already shut down?"
+            return
+        end
+
+        node = @session.object(:object_id => vm.node)
+        raise "Unable to get node that vm is on??" unless node
+
+        if vm.state == "shutdown" or vm.state == "shutoff"
+            set_vm_shut_down(db_vm)
+            return
+        elsif vm.state == "suspended"
+            raise "Cannot shutdown suspended domain"
+        elsif vm.state == "saved"
+            raise "Cannot shutdown saved domain"
+        end
+
+        if action == :shutdown
+            result = vm.shutdown
+            raise "Error shutting down VM: #{result.text}" unless result.status == 0
+        elsif action == :destroy
+            result = vm.destroy
+            raise "Error destroying VM: #{result.text}" unless result.status == 0
+        end
+
+        # undefine can fail, for instance, if we live migrated from A -> B, and
+        # then we are shutting down the VM on B (because it only has "transient"
+        # XML).  Therefore, just ignore undefine errors so we do the rest
+        # FIXME: we really should have a marker in the database somehow so that
+        # we can tell if this domain was migrated; that way, we can tell the
+        # difference between a real undefine failure and one because of migration
+        result = vm.undefine
+        puts "Error undefining VM: #{result.text}" unless result.status == 0
+
+        teardown_storage_pools(node)
+
+        set_vm_shut_down(db_vm)
+    end
+
+    def task_start_vm(task)
+        db_vm = find_vm(task, false)
+
+        # XXX: Kinda silly?  I dunno about these intermediate states..
+        set_vm_state(db_vm, Vm::STATE_STARTING)
+
+        vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+
+        if vm
+            case vm.state
+                when "running"
+                    return
+                when "blocked"
+                    raise "Virtual machine state is blocked, cannot start VM."
+                when "paused"
+                    raise "Virtual machine is currently paused, cannot start, must resume."
+            end
+        end
+        # FIXME: There's a bug here in that a host that's already running the vm won't be
+        # returned.  I think that's supposed to be for migration but it just breaks stuff.
+        db_host = find_capable_host(db_vm)
+
+        node = @session.object(:class => "node", 'hostname' => db_host.hostname)
+
+        raise "Unable to find host #{db_host.hostname} to create VM on." unless node
+
+        if (db_vm.boot_device == Vm::BOOT_DEV_CDROM) && db_vm.uses_cobbler? && (db_vm.cobbler_type == Vm::IMAGE_PREFIX)
+            details = Cobbler::Image.find_one(db_vm.cobbler_name)
+            raise "Image #{vm.cobbler_name} not found in Cobbler server" unless details
+
+            # extract the components of the image filename
+            image_uri = details.file
+            protocol = auth = ip_addr = export_path = filename = ""
+
+            protocol, image_uri = image_uri.split("://") if image_uri.include?("://")
+            auth, image_uri = image_uri.split("@") if image_uri.include?("@")
+            # it's ugly, but string.split returns an empty string as the first
+            # result here, so we'll just ignore it
+            ignored, ip_addr, image_uri =
+                    image_uri.split(/^([^\/]+)(\/.*)/) unless image_uri =~ /^\//
+            ignored, export_path, filename =
+                    image_uri.split(/^(.*)\/(.+)/)
+
+            found = false
+
+            db_vm.storage_volumes.each do |volume|
+                if volume.filename == filename
+                    if (volume.storage_pool.ip_addr == ip_addr) &&
+                        (volume.storage_pool.export_path == export_path)
+                        found = true
+                    end
+                end
+            end
+
+            unless found
+                # Create a new transient NFS storage volume
+                # This volume is *not* persisted.
+                image_volume = StorageVolume.factory("NFS", :filename => filename)
+
+                image_volume.storage_pool
+                image_pool = StoragePool.factory(StoragePool::NFS)
+
+                image_pool.ip_addr = ip_addr
+                image_pool.export_path = export_path
+                image_pool.storage_volumes << image_volume
+                image_volume.storage_pool = image_pool
+            end
+        end
+
+        # FIXME: I know this part is broken..
+        #
+        # hrrm, who wrote this comment and why is it broken?  - Ian
+        volumes = []
+        volumes += db_vm.storage_volumes
+        volumes << image_volume if image_volume
+        storagedevs = connect_storage_pools(node, volumes)
+
+        # FIXME: get rid of the hardcoded bridge
+        xml = create_vm_xml(db_vm.description, db_vm.uuid, db_vm.memory_allocated,
+                            db_vm.memory_used, db_vm.num_vcpus_allocated, db_vm.boot_device,
+                            db_vm.vnic_mac_addr, "ovirtbr0", storagedevs)
+
+        result = node.domainDefineXML(xml.to_s)
+        raise "Error defining virtual machine: #{result.text}" unless result.status == 0
+
+        domain = @session.object(:object_id => result.domain)
+        raise "Cannot find domain on host #{db_host.hostname}, cannot start virtual machine." unless domain
+
+        result = domain.create
+        if result.status != 0
+            domain.undefine
+            raise "Error creating virtual machine: #{result.text}"
+        end
+
+        result = domain.getXMLDesc
+
+        # Reget the db record or you can get 'dirty' errors.
+        db_vm = find_vm(task, false)
+        set_vm_vnc_port(db_vm, result.description) unless result.status != 0
+
+        # XXX: This information is not available via the libvirt interface.
+        db_vm.memory_used = db_vm.memory_allocated
+        db_vm.boot_device = Vm::BOOT_DEV_HD
+        db_vm.host_id = db_host.id
+
+        # We write the new state here even though dbomatic will set it soon anyway.
+        # This is just to let the UI know that it's good to go right away and really
+        # dbomatic will just write the same thing over top of it soon enough.
+        db_vm.state = Vm::STATE_RUNNING
+        db_vm.save!
+    end
+
+    def task_suspend_vm(task)
+        db_vm = task.vm
+        dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+        raise "Unable to locate VM to suspend" unless dom
+
+        if dom.state == "shutdown" or dom.state == "shutoff"
+            raise "Cannot suspend stopped domain"
+        elsif dom.state == "paused"
+            raise "Cannot suspend saved domain"
+        end
+
+        result = dom.suspend
+        raise "Error suspending VM: #{result.text}" unless result.status == 0
+
+        db_vm.state = Vm::STATE_SUSPENDED
+        db_vm.save!
+    end
+
+    def task_resume_vm(task)
+        db_vm = task.vm
+        dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+        raise "Unable to locate VM to resume" unless dom
+
+        if dom.state == "running"
+            # the VM is already suspended; just return success
+            return
+        elsif dom.state == "shutoff" or dom.state == "shutdown"
+            raise "Cannot resume stopped domain"
+        elsif dom.state == "blocked"
+            raise "Cannot resume suspended domain"
+        end
+
+        result = dom.resume
+        raise "Error resuming VM: #{result.text}" unless result.status == 0
+
+        db_vm.state = Vm::STATE_RUNNING
+        db_vm.save!
+    end
+
+    def task_save_vm(task)
+        db_vm = task.vm
+        dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+        raise "Unable to locate VM to save" unless dom
+
+        #XXX: I'm not checking states here. I want to see if libvirt gives back
+        #decent error messages for different states.
+        filename = "/tmp/#{dom.uuid}.save"
+        puts "saving vm #{dom.name} to #{filename}"
+        result = dom.save(filename)
+        raise "Error saving VM: #{result.text}" unless result.status == 0
+
+        set_vm_state(db_vm, Vm::STATE_SAVED)
+    end
+
+    def task_restore_vm(task)
+        db_vm = task.vm
+        dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+        raise "Unable to locate VM to restore" unless dom
+
+        #XXX: I'm not checking states here. I want to see if libvirt gives back
+        #decent error messages for different states.
+
+        filename = "/tmp/#{dom.uuid}.save"
+        puts "restoring vm #{dom.name} from #{filename}"
+        result = dom.restore("/tmp/" + dom.uuid + ".save")
+        raise "Error restoring VM: #{result.text}" unless result.status == 0
+
+        set_vm_state(db_vm, Vm::STATE_RUNNING)
+    end
+
+    def migrate(db_vm, dest = nil)
+
+        vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+        raise "Unable to find VM to migrate" unless vm
+        src_node = @session.object(:object_id => vm.node)
+        raise "Unable to find node that VM is on??" unless src_node
+
+        puts "Migrating domain lookup complete, domain is #{vm}"
+
+        case vm.state
+            when "blocked"
+                raise "Unable to migrate blocked VM."
+            when "paused"
+                raise "Unable to migrate suspended VM."
+        end
+
+        vm_orig_state = db_vm.state
+        set_vm_state(db_vm, Vm::STATE_MIGRATING)
+
+        begin
+            unless dest.nil? or dest.empty?
+                if dest.to_i == db_vm.host_id
+                    raise "Cannot migrate from host " + src_node.hostname + " to itself!"
+                end
+                db_dst_host = find_host(dest.to_i)
+            else
+                db_dst_host = find_capable_host(db_vm)
+            end
+
+            dest_node = @session.object(:class => 'node', 'hostname' => db_dst_host.hostname)
+            raise "Unable to find host #{db_dst_host.hostname} to migrate to." unless dest_node
+
+            volumes = []
+            volumes += db_vm.storage_volumes
+            connect_storage_pools(dest_node, volumes)
+
+            # Sadly migrate with qpid is broken because it requires a connection between
+            # both nodes and currently that can't happen securely.  For now we do it
+            # the old fashioned way..
+            dst_uri = "qemu+tcp://#{dest_node.hostname}/system"
+            src_uri = "qemu+tcp://#{src_node.hostname}/system"
+            src_conn = Libvirt::open("qemu+tcp://" + src_node.hostname + "/system")
+            dst_conn = Libvirt::open("qemu+tcp://" + dest_node.hostname + "/system")
+            dom = src_conn.lookup_domain_by_uuid(vm.uuid)
+            dom.migrate(dst_conn, Libvirt::Domain::MIGRATE_LIVE)
+            src_conn.close
+            dst_conn.close
+
+            # undefine can fail, for instance, if we live migrated from A -> B, and
+            # then we are shutting down the VM on B (because it only has "transient"
+            # XML).  Therefore, just ignore undefine errors so we do the rest
+            # FIXME: we really should have a marker in the database somehow so that
+            # we can tell if this domain was migrated; that way, we can tell the
+            # difference between a real undefine failure and one because of migration
+            result = vm.undefine
+            puts "Error undefining old vm after migrate: #{result.text}" unless result.status == 0
+
+            # See if we can take down storage pools on the src host.
+            teardown_storage_pools(src_node)
+        rescue => ex
+            puts "Error: #{ex}"
+            set_vm_state(db_vm, vm_orig_state)
+            raise ex
+        end
+
+        db_vm.state = Vm::STATE_RUNNING
+        db_vm.host_id = db_dst_host.id
+        db_vm.save!
+    end
+
+    def task_migrate_vm(task)
+        puts "migrate_vm"
+
+        # here, we are given an id for a VM to migrate; we have to lookup which
+        # physical host it is running on
+        vm = find_vm(task)
+        migrate(vm, task.args)
+    end
+
+    def storage_find_suitable_host(hardware_pool)
+        # find all of the hosts in the same pool as the storage
+        hardware_pool.hosts.each do |host|
+            puts "storage_find_suitable_host: host #{host.hostname} uuid #{host.uuid}"
+            node = @session.object(:class => 'node', 'hostname' => host.hostname)
+            return node if node
+        end
+
+        raise "Could not find a host within this storage pool to scan the storage server."
+    end
+
+    def add_volumes_to_db(db_pool, libvirt_pool, owner = nil, group = nil, mode = nil)
+        # FIXME: this is currently broken if you do something like:
+        # 1.  Add an iscsi pool with 3 volumes (lun-1, lun-2, lun-3)
+        # 2.  Scan it in
+        # 3.  Remove lun-3 from the pool
+        # 4.  Re-scan it
+        # What will happen is that you will still have lun-3 available in the
+        # database, even though it's not available in the pool anymore.  It's a
+        # little tricky, though; we have to make sure that we don't pull the
+        # database entry out from underneath a possibly running VM (or do we?)
+        volumes = @session.objects(:class => 'volume', 'storagePool' => libvirt_pool.remote_pool.object_id)
+        volumes.each do |volume|
+            storage_volume = StorageVolume.factory(db_pool.get_type_label)
+
+            # NOTE: it is safe (and, in fact, necessary) to use
+            # #{storage_volume.volume_name} here without sanitizing it.  This is
+            # because this is *not* based on user modifiable data, but rather, on an
+            # internal implementation detail
+            existing_vol = StorageVolume.find(:first, :conditions =>
+                                              ["storage_pool_id = ? AND #{storage_volume.volume_name} = ?",
+                                              db_pool.id, volume.name])
+
+            # in this case, this path already exists in the database; just skip
+            next if existing_vol
+
+            storage_volume = StorageVolume.factory(db_pool.get_type_label)
+            storage_volume.path = volume.path
+            storage_volume.size = volume.capacity / 1024
+            storage_volume.storage_pool_id = db_pool.id
+            storage_volume.write_attribute(storage_volume.volume_name, volume.name)
+            storage_volume.lv_owner_perms = owner
+            storage_volume.lv_group_perms = group
+            storage_volume.lv_mode_perms = mode
+            storage_volume.state = StorageVolume::STATE_AVAILABLE
+            puts "saving storage volume to db."
+            storage_volume.save!
+        end
+    end
+
+    # The words "pool" and "volume" are ridiculously overloaded in our context.
+    # Therefore, the refresh_pool method adopts this convention:
+    # db_pool_phys: The underlying physical storage pool, as it is represented in
+    #               the database
+    # phys_libvirt_pool: The underlying physical storage, as it is represented in
+    #                    libvirt
+    # db_lvm_pool: The logical storage pool (if it exists), as it is represented
+    #              in the database
+    # lvm_libvirt_pool: The logical storage pool (if it exists), as it is
+    #                   represented in the database
+
+    def task_refresh_pool(task)
+        puts "refresh_pool"
+
+        db_pool_phys = task.storage_pool
+        raise "Could not find storage pool" unless db_pool_phys
+
+        node = storage_find_suitable_host(db_pool_phys.hardware_pool)
+
+        begin
+            phys_libvirt_pool = LibvirtPool.factory(db_pool_phys)
+            phys_libvirt_pool.connect(@session, node)
+
+            begin
+                # OK, the pool is all set.  Add in all of the volumes
+                add_volumes_to_db(db_pool_phys, phys_libvirt_pool)
+
+                db_pool_phys.state = StoragePool::STATE_AVAILABLE
+                db_pool_phys.save!
+
+                # OK, now we've scanned the underlying hardware pool and added the
+                # volumes.  Next we scan for pre-existing LVM volumes
+                result = node.findStoragePoolSources("logical", nil)
+                raise "Error finding logical volumes in pool: #{result.text}" unless result.status == 0
+                logical_xml = result.xmlDesc
+
+                Document.new(logical_xml).elements.each('sources/source') do |source|
+                    vgname = source.elements["name"].text
+
+                    # If matching any of the <device> sections in the LVM XML fails
+                    # against the storage pool, then it is likely that this is a storage
+                    # pool not associated with the one we connected above.  Go on
+                    # FIXME: it would be nicer to catch the right exception here, and
+                    # fail on other exceptions
+                    source.elements.each("device") do |device|
+                        log_vol = @session.object(:class => 'volume',
+                                                  'path' => device.attributes["path"],
+                                                  'storagePool' => phys_libvirt_pool.remote_pool.object_id)
+                        puts "Didn't find logical volume for device #{device}"
+                        next unless log_vol
+                    end
+
+                    # if we make it here, then we were able to resolve all of the devices,
+                    # so we know we need to use a new pool
+                    db_lvm_pool = LvmStoragePool.find(:first, :conditions =>
+                                                      ["vg_name = ?", vgname])
+                    if db_lvm_pool == nil
+                        puts "Adding lvm storage pool to database."
+                        db_lvm_pool = LvmStoragePool.new
+                        db_lvm_pool[:type] = "LvmStoragePool"
+                        # set the LVM pool to the same hardware pool as the underlying storage
+                        db_lvm_pool.hardware_pool_id = db_pool_phys.hardware_pool_id
+                        db_lvm_pool.vg_name = vgname
+                        db_lvm_pool.save!
+                    end
+
+                    source.elements.each("device") do |device|
+                        log_vol = @session.object(:class => 'volume',
+                                                  'path' => device.attributes["path"],
+                                                  'storagePool' => phys_libvirt_pool.remote_pool.object_id)
+                        if !log_vol
+                            puts "Unable to find logical volume with path #{device.attributes["path"]} on host"
+                            next
+                        end
+
+                        physical_vol = StorageVolume.find(:first, :conditions =>
+                                                          ["path = ?", log_vol.path])
+                        if physical_vol == nil
+                            # Hm. We didn't find the device in the storage volumes already.
+                            # something went wrong internally, and we have to bail
+                            raise "Storage internal physical volume error"
+                        end
+
+                        # OK, put the right lvm_pool_id in place
+                        physical_vol.lvm_pool_id = db_lvm_pool.id
+                        physical_vol.save!
+                    end
+
+                    lvm_libvirt_pool = LibvirtPool.factory(db_lvm_pool)
+                    lvm_libvirt_pool.connect(@session, node)
+
+                    # Do this in a block just in case we have some issue with the database.
+                    begin
+                        add_volumes_to_db(db_lvm_pool, lvm_libvirt_pool, "0744", "0744", "0744")
+                    ensure
+                        lvm_libvirt_pool.shutdown
+                    end
+                end
+            end
+        ensure
+            phys_libvirt_pool.shutdown
+        end
+    end
+
+    def task_create_volume(task)
+        puts "create_volume"
+
+        db_volume = task.storage_volume
+        raise "Could not find storage volume to create" unless db_volume
+
+        db_pool = db_volume.storage_pool
+        raise "Could not find storage pool" unless db_pool
+
+        node = storage_find_suitable_host(db_pool.hardware_pool)
+
+        begin
+            if db_volume[:type] == "LvmStorageVolume"
+                phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
+                phys_libvirt_pool.connect(@session, node)
+            end
+
+            begin
+                libvirt_pool = LibvirtPool.factory(db_pool)
+
+                begin
+                    libvirt_pool.connect(@session, node)
+
+                    libvirt_pool.create_vol(*db_volume.volume_create_params)
+                    db_volume.state = StorageVolume::STATE_AVAILABLE
+                    db_volume.save!
+
+                    db_pool.state = StoragePool::STATE_AVAILABLE
+                    db_pool.save!
+                ensure
+                    libvirt_pool.shutdown
+                end
+            ensure
+                if db_volume[:type] == "LvmStorageVolume"
+                    phys_libvirt_pool.shutdown
+                end
+            end
+        end
+    end
+
+    def task_delete_volume(task)
+        puts "delete_volume"
+
+        db_volume = task.storage_volume
+        raise "Could not find storage volume to create" unless db_volume
+
+        db_pool = db_volume.storage_pool
+        raise "Could not find storage pool" unless db_pool
+
+        node = storage_find_suitable_host(db_pool.hardware_pool)
+
+        begin
+            if db_volume[:type] == "LvmStorageVolume"
+                phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
+                phys_libvirt_pool.connect(@session, node)
+            end
+
+            begin
+                libvirt_pool = LibvirtPool.factory(db_pool)
+                libvirt_pool.connect(@session, node)
+
+                begin
+                    volume = @session.object(:class => 'volume', 'storagePool' => libvirt_pool.remote_pool.object_id)
+                    puts "Unable to find volume to delete" unless volume
+
+                    # FIXME: we actually probably want to zero out the whole volume here, so
+                    # we aren't potentially leaking data from one user to another.  There
+                    # are two problems, though:
+                    # 1)  I'm not sure how I would go about zero'ing the data on a remote
+                    # machine, since there is no "libvirt_write_data" call
+                    # 2)  This could potentially take quite a while, so we want to spawn
+                    # off another thread to do it
+                    volume.delete
+
+                    # Note: we have to nil out the task_target because when we delete the
+                    # volume object, that also deletes all dependent tasks (including this
+                    # one), which leads to accessing stale tasks.  Orphan the task, then
+                    # delete the object; we can clean up orphans later (or not, depending
+                    # on the audit policy)
+                    task.task_target = nil
+                    task.save!
+
+                    db_volume.destroy
+                ensure
+                    libvirt_pool.shutdown
+                end
+            ensure
+                if db_volume[:type] == "LvmStorageVolume"
+                    phys_libvirt_pool.shutdown
+                end
+            end
+        end
+    end
+
+    def task_clear_vms_host(task)
+        src_host = task.host
+
+        src_host.vms.each do |vm|
+            migrate(vm)
+        end
+    end
+
+    def mainloop()
+        loop do
+            tasks = Array.new
+            begin
+                tasks = Task.find(:all, :conditions => [ "state = ?", Task::STATE_QUEUED ])
+            rescue => ex
+                puts "1 #{ex.class}: #{ex.message}"
+                if Task.connected?
+                    begin
+                        ActiveRecord::Base.connection.reconnect!
+                    rescue => norecon
+                        puts "2 #{norecon.class}: #{norecon.message}"
+                    end
+                else
+                    begin
+                        database_connect
+                    rescue => ex
+                        puts "3 #{ex.class}: #{ex.message}"
+                    end
+                end
+            end
+
+            tasks.each do |task|
+                # make sure we get our credentials up-front
+                get_credentials
+
+                task.time_started = Time.now
+
+                state = Task::STATE_FINISHED
+                begin
+                    case task.action
+                        when VmTask::ACTION_CREATE_VM then task_create_vm(task)
+                        when VmTask::ACTION_SHUTDOWN_VM then task_shutdown_or_destroy_vm(task, :shutdown)
+                        when VmTask::ACTION_POWEROFF_VM then task_shutdown_or_destroy_vm(task, :destroy)
+                        when VmTask::ACTION_START_VM then task_start_vm(task)
+                        when VmTask::ACTION_SUSPEND_VM then task_suspend_vm(task)
+                        when VmTask::ACTION_RESUME_VM then task_resume_vm(task)
+                        when VmTask::ACTION_SAVE_VM then task_save_vm(task)
+                        when VmTask::ACTION_RESTORE_VM then task_restore_vm(task)
+                        when VmTask::ACTION_MIGRATE_VM then task_migrate_vm(task)
+                        when StorageTask::ACTION_REFRESH_POOL then task_refresh_pool(task)
+                        when StorageVolumeTask::ACTION_CREATE_VOLUME then task_create_volume(task)
+                        when StorageVolumeTask::ACTION_DELETE_VOLUME then task_delete_volume(task)
+                        when HostTask::ACTION_CLEAR_VMS then task_clear_vms_host(task)
+                    else
+                        puts "unknown task " + task.action
+                        state = Task::STATE_FAILED
+                        task.message = "Unknown task type"
+                    end
+                rescue => ex
+                    puts "Task action processing failed: #{ex.class}: #{ex.message}"
+                    puts ex.backtrace
+                    state = Task::STATE_FAILED
+                    task.message = ex.message
+                end
+
+                task.state = state
+                task.time_ended = Time.now
+                task.save!
+                puts "done"
+            end
+            # FIXME: here, we clean up "orphaned" tasks.  These are tasks that we had
+            # to orphan (set task_target to nil) because we were deleting the object they
+            # depended on.
+            Task.find(:all, :conditions => [ "task_target_id IS NULL and task_target_type IS NULL" ]).each do |task|
+                task.destroy
+            end
+            sleep(1)
+        end
+    end
 end
+
+taskomatic = TaskOmatic.new()
+taskomatic.mainloop()
+
diff --git a/src/task-omatic/utils.rb b/src/task-omatic/utils.rb
deleted file mode 100644
index e3005ed..0000000
--- a/src/task-omatic/utils.rb
+++ /dev/null
@@ -1,221 +0,0 @@
-require 'rexml/document'
-include REXML
-
-def String.random_alphanumeric(size=16)
-  s = ""
-  size.times { s << (i = Kernel.rand(62); i += ((i < 10) ? 48 : ((i < 36) ? 55 : 61 ))).chr }
-  s
-end
-
-def all_storage_pools(conn)
-  all_pools = conn.list_defined_storage_pools
-  all_pools.concat(conn.list_storage_pools)
-  return all_pools
-end
-
-def get_libvirt_lvm_pool_from_volume(db_volume)
-  phys_volume = StorageVolume.find(:first, :conditions =>
-                                   [ "lvm_pool_id = ?", db_volume.storage_pool_id])
-
-  return LibvirtPool.factory(phys_volume.storage_pool)
-end
-
-class LibvirtPool
-  def initialize(type, name = nil)
-    @remote_pool = nil
-    @build_on_start = true
-    @remote_pool_defined = false
-    @remote_pool_started = false
-
-    if name == nil
-      @name = type + "-" + String.random_alphanumeric
-    else
-      @name = name
-    end
-
-    @xml = Document.new
-    @xml.add_element("pool", {"type" => type})
-
-    @xml.root.add_element("name").add_text(@name)
-
-    @xml.root.add_element("source")
-
-    @xml.root.add_element("target")
-    @xml.root.elements["target"].add_element("path")
-  end
-
-  def connect(conn)
-    all_storage_pools(conn).each do |remote_pool_name|
-      tmppool = conn.lookup_storage_pool_by_name(remote_pool_name)
-
-      if self.xmlequal?(Document.new(tmppool.xml_desc).root)
-        @remote_pool = tmppool
-        break
-      end
-    end
-
-    if @remote_pool == nil
-      @remote_pool = conn.define_storage_pool_xml(@xml.to_s)
-      # we need this because we don't necessarily want to "build" LVM pools,
-      # which might destroy existing data
-      if @build_on_start
-        @remote_pool.build
-      end
-      @remote_pool_defined = true
-    end
-
-    if @remote_pool.info.state == Libvirt::StoragePool::INACTIVE
-      # only try to start the pool if it is currently inactive; in all other
-      # states, assume it is already running
-      @remote_pool.create
-      @remote_pool_started = true
-    end
-  end
-
-  def list_volumes
-    return @remote_pool.list_volumes
-  end
-
-  def lookup_vol_by_path(dev)
-    return @remote_pool.lookup_volume_by_path(dev)
-  end
-
-  def lookup_vol_by_name(name)
-    return @remote_pool.lookup_volume_by_name(name)
-  end
-
-  def create_vol(type, name, size, owner, group, mode)
-    @vol_xml = Document.new
-    @vol_xml.add_element("volume", {"type" => type})
-    @vol_xml.root.add_element("name").add_text(name)
-    @vol_xml.root.add_element("capacity", {"unit" => "K"}).add_text(size.to_s)
-    @vol_xml.root.add_element("target")
-    @vol_xml.root.elements["target"].add_element("permissions")
-    @vol_xml.root.elements["target"].elements["permissions"].add_element("owner").add_text(owner)
-    @vol_xml.root.elements["target"].elements["permissions"].add_element("group").add_text(group)
-    @vol_xml.root.elements["target"].elements["permissions"].add_element("mode").add_text(mode)
-  end
-
-  def shutdown
-    if @remote_pool_started
-      @remote_pool.destroy
-    end
-    if @remote_pool_defined
-      @remote_pool.undefine
-    end
-  end
-
-  def xmlequal?(docroot)
-    return false
-  end
-
-  def self.factory(pool)
-    if pool[:type] == "IscsiStoragePool"
-      return IscsiLibvirtPool.new(pool.ip_addr, pool[:target])
-    elsif pool[:type] == "NfsStoragePool"
-      return NFSLibvirtPool.new(pool.ip_addr, pool.export_path)
-    elsif pool[:type] == "LvmStoragePool"
-      # OK, if this is LVM storage, there are two cases we need to care about:
-      # 1) this is a LUN with LVM already on it.  In this case, all we need to
-      #    do is to create a new LV (== libvirt volume), and be done with it
-      # 2) this LUN is blank, so there is no LVM on it already.  In this
-      #    case, we need to pvcreate, vgcreate first (== libvirt pool build),
-      #    and *then* create the new LV (== libvirt volume) on top of that.
-      #
-      # We can tell the difference between an LVM Pool that exists and one
-      # that needs to be created based on the value of the pool.state;
-      # if it is PENDING_SETUP, we need to create it first
-      phys_volume = StorageVolume.find(:first, :conditions =>
-                                       [ "lvm_pool_id = ?", pool.id])
-
-      return LVMLibvirtPool.new(pool.vg_name, phys_volume.path,
-                                pool.state == StoragePool::STATE_PENDING_SETUP)
-    else
-      raise "Unknown storage pool type " + pool[:type].to_s
-    end
-  end
-end
-
-class IscsiLibvirtPool < LibvirtPool
-  def initialize(ip_addr, target)
-    super('iscsi')
-
-    @type = 'iscsi'
-    @ipaddr = ip_addr
-    @target = target
-
-    @xml.root.elements["source"].add_element("host", {"name" => @ipaddr})
-    @xml.root.elements["source"].add_element("device", {"path" => @target})
-
-    @xml.root.elements["target"].elements["path"].text = "/dev/disk/by-id"
-  end
-
-  def xmlequal?(docroot)
-    return (docroot.attributes['type'] == @type and
-            docroot.elements['source'].elements['host'].attributes['name'] == @ipaddr and
-            docroot.elements['source'].elements['device'].attributes['path'] == @target)
-  end
-end
-
-class NFSLibvirtPool < LibvirtPool
-  def initialize(ip_addr, export_path)
-    super('netfs')
-
-    @type = 'netfs'
-    @host = ip_addr
-    @remote_path = export_path
-    @name = String.random_alphanumeric
-
-    @xml.root.elements["source"].add_element("host", {"name" => @host})
-    @xml.root.elements["source"].add_element("dir", {"path" => @remote_path})
-    @xml.root.elements["source"].add_element("format", {"type" => "nfs"})
-
-    @xml.root.elements["target"].elements["path"].text = "/mnt/" + @name
-  end
-
-  def create_vol(name, size, owner, group, mode)
-    # FIXME: this can actually take some time to complete (since we aren't
-    # doing sparse allocations at the moment).  During that time, whichever
-    # libvirtd we chose to use is completely hung up.  The solution is 3-fold:
-    # 1.  Allow sparse allocations in the WUI front-end
-    # 2.  Make libvirtd multi-threaded
-    # 3.  Make taskomatic multi-threaded
-    super("netfs", name, size, owner, group, mode)
-
-    # FIXME: we have to add the format as raw here because of a bug in libvirt;
-    # if you specify a volume with no format, it will crash libvirtd
-    @vol_xml.root.elements["target"].add_element("format", {"type" => "raw"})
-    @remote_pool.create_vol_xml(@vol_xml.to_s)
-  end
-
-  def xmlequal?(docroot)
-    return (docroot.attributes['type'] == @type and
-            docroot.elements['source'].elements['host'].attributes['name'] == @host and
-            docroot.elements['source'].elements['dir'].attributes['path'] == @remote_path)
-  end
-end
-
-class LVMLibvirtPool < LibvirtPool
-  def initialize(vg_name, device, build_on_start)
-    super('logical', vg_name)
-
-    @type = 'logical'
-    @build_on_start = build_on_start
-
-    @xml.root.elements["source"].add_element("name").add_text(@name)
-    @xml.root.elements["source"].add_element("device", {"path" => device})
-
-    @xml.root.elements["target"].elements["path"].text = "/dev/" + @name
-  end
-
-  def create_vol(name, size, owner, group, mode)
-    super("logical", name, size, owner, group, mode)
-    @remote_pool.create_vol_xml(@vol_xml.to_s)
-  end
-
-  def xmlequal?(docroot)
-    return (docroot.attributes['type'] == @type and
-            docroot.elements['name'].text == @name and
-            docroot.elements['source'].elements['name'] == @name)
-  end
-end
-- 
1.6.0.4




More information about the ovirt-devel mailing list