[Ovirt-devel] [PATCH server] Taskomatic Refactoring and Qpidification Take 4

Ian Main imain at redhat.com
Fri Jan 23 05:21:41 UTC 2009


This version incorporates feedback from Chris Lancette.

This patch reworks taskomatic quite a bit.  This mostly just shifts
taskomatic to using the qpid interface in place of ruby-libvirt.  It
also fixes a few bugs I discovered a long the way and adds new ones
I'm sure.  The only other thing added was round-robin host selection
for VMs.

Where ever possible the hosts are queried directly using qpid rather than
relying on states from the database.

This patch loses about 150 lines from the original taskomatic and moves
most of the task implementation into a central class.  This was done to
provide access to the qpid session as well as providing for locking/task
ordering in future versions.

This requires the latest libvirt-qpid as it fixes a number of bugs.
It's in the ovirt repository now.

Issues remaining:

- libvirt-qpid migrate is broken.  Since the migrate takes place on the
  node instead of from the ovirt-appliance, the source node doesn't have
  the ability to authenticate against the destination node.  For this
  reason I'm still using ruby-libvirt migrate.  I talked to Chris about
  this and we have a plan worked out. :)

- LVM volume scanning is not in this version.  I intend to address this
  asap.

Signed-off-by: Ian Main <imain at redhat.com>
---
 src/task-omatic/task_storage.rb |  475 +++++++++++-----------
 src/task-omatic/task_vm.rb      |  575 +---------------------------
 src/task-omatic/taskomatic.rb   |  825 ++++++++++++++++++++++++++++++++++-----
 3 files changed, 971 insertions(+), 904 deletions(-)

diff --git a/src/task-omatic/task_storage.rb b/src/task-omatic/task_storage.rb
index 19800fb..6465308 100644
--- a/src/task-omatic/task_storage.rb
+++ b/src/task-omatic/task_storage.rb
@@ -1,5 +1,6 @@
 # Copyright (C) 2008 Red Hat, Inc.
 # Written by Chris Lalancette <clalance at redhat.com>
+# and Ian Main <imain at redhat.com>
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -16,287 +17,285 @@
 # MA  02110-1301, USA.  A copy of the GNU General Public License is
 # also available at http://www.gnu.org/copyleft/gpl.html.
 
-require 'utils'
-
 require 'libvirt'
+require 'rexml/document'
+include REXML
+
+def String.random_alphanumeric(size=16)
+  s = ""
+  size.times { s << (i = Kernel.rand(62); i += ((i < 10) ? 48 : ((i < 36) ? 55 : 61 ))).chr }
+  s
+end
+
+def get_libvirt_lvm_pool_from_volume(db_volume)
+  phys_volume = StorageVolume.find(:first, :conditions =>
+                                   ["lvm_pool_id = ?", db_volume.storage_pool_id])
+
+  return LibvirtPool.factory(phys_volume.storage_pool)
+end
+
+
+def task_storage_cobbler_setup(db_vm)
+
+  image_volume = nil
 
-def add_volumes_to_db(db_pool, libvirt_pool, owner = nil, group = nil, mode = nil)
-  # FIXME: this is currently broken if you do something like:
-  # 1.  Add an iscsi pool with 3 volumes (lun-1, lun-2, lun-3)
-  # 2.  Scan it in
-  # 3.  Remove lun-3 from the pool
-  # 4.  Re-scan it
-  # What will happen is that you will still have lun-3 available in the
-  # database, even though it's not available in the pool anymore.  It's a
-  # little tricky, though; we have to make sure that we don't pull the
-  # database entry out from underneath a possibly running VM (or do we?)
-  libvirt_pool.list_volumes.each do |volname|
-    storage_volume = StorageVolume.factory(db_pool.get_type_label)
-
-    # NOTE: it is safe (and, in fact, necessary) to use
-    # #{storage_volume.volume_name} here without sanitizing it.  This is
-    # because this is *not* based on user modifiable data, but rather, on an
-    # internal implementation detail
-    existing_vol = StorageVolume.find(:first, :conditions =>
-                                      [ "storage_pool_id = ? AND #{storage_volume.volume_name} = ?",
-                                        db_pool.id, volname])
-    if existing_vol != nil
-      # in this case, this path already exists in the database; just skip
-      next
+  if (db_vm.boot_device == Vm::BOOT_DEV_CDROM) &&
+      db_vm.uses_cobbler? && (db_vm.cobbler_type == Vm::IMAGE_PREFIX)
+
+    details = Cobbler::Image.find_one(db_vm.cobbler_name)
+    raise "Image #{vm.cobbler_name} not found in Cobbler server" unless details
+
+    # extract the components of the image filename
+    image_uri = details.file
+    protocol = auth = ip_addr = export_path = filename = ""
+
+    protocol, image_uri = image_uri.split("://") if image_uri.include?("://")
+    auth, image_uri = image_uri.split("@") if image_uri.include?("@")
+
+    # it's ugly, but string.split returns an empty string as the first
+    # result here, so we'll just ignore it
+    ignored, ip_addr, image_uri =
+        image_uri.split(/^([^\/]+)(\/.*)/) unless image_uri =~ /^\//
+    ignored, export_path, filename =
+        image_uri.split(/^(.*)\/(.+)/)
+
+    found = false
+
+    db_vm.storage_volumes.each do |volume|
+      if volume.filename == filename
+        if (volume.storage_pool.ip_addr == ip_addr) &&
+          (volume.storage_pool.export_path == export_path)
+          found = true
+        end
+      end
     end
 
-    volptr = libvirt_pool.lookup_vol_by_name(volname)
+    unless found
+      # Create a new transient NFS storage volume
+      # This volume is *not* persisted.
+      image_volume = StorageVolume.factory("NFS", :filename => filename)
 
-    volinfo = volptr.info
+      image_volume.storage_pool
+      image_pool = StoragePool.factory(StoragePool::NFS)
 
-    storage_volume = StorageVolume.factory(db_pool.get_type_label)
-    storage_volume.path = volptr.path
-    storage_volume.size = volinfo.capacity / 1024
-    storage_volume.storage_pool_id = db_pool.id
-    storage_volume.write_attribute(storage_volume.volume_name, volname)
-    storage_volume.lv_owner_perms = owner
-    storage_volume.lv_group_perms = group
-    storage_volume.lv_mode_perms = mode
-    storage_volume.state = StorageVolume::STATE_AVAILABLE
-    storage_volume.save!
+      image_pool.ip_addr = ip_addr
+      image_pool.export_path = export_path
+      image_pool.storage_volumes << image_volume
+      image_volume.storage_pool = image_pool
+    end
   end
+
+  return image_volume
 end
 
-def storage_find_suitable_host(hardware_pool)
-  conn = nil
-  hardware_pool.hosts.each do |host|
-    if not host.is_disabled.nil? and host.is_disabled == 0 \
-      and host.state == Host::STATE_AVAILABLE
-      begin
-        # FIXME: this can hang up taskomatic for quite some time.  To see how,
-        # make one of your remote servers do "iptables -I INPUT -j DROP"
-        # and then try to run this; it will take TCP quite a while to give up.
-        # Unfortunately the solution is probably to do some sort of threading
-        conn = Libvirt::open("qemu+tcp://" + host.hostname + "/system")
-
-        # if we didn't raise an exception, we connected; get out of here
+class LibvirtPool
+
+  attr_reader :remote_pool
+
+  def initialize(type, name = nil)
+    @remote_pool = nil
+    @build_on_start = true
+    @remote_pool_defined = false
+    @remote_pool_started = false
+
+    if name == nil
+      @name = type + "-" + String.random_alphanumeric
+    else
+      @name = name
+    end
+
+    @xml = Document.new
+    @xml.add_element("pool", {"type" => type})
+
+    @xml.root.add_element("name").add_text(@name)
+
+    @xml.root.add_element("source")
+
+    @xml.root.add_element("target")
+    @xml.root.elements["target"].add_element("path")
+  end
+
+  def connect(session, node)
+    pools = session.objects(:class => 'pool', 'node' => node.object_id)
+    pools.each do |pool|
+      result = pool.getXMLDesc
+      raise "Error getting xml description of pool: #{result.text}" unless result.status == 0
+
+      xml_desc = result.description
+      if self.xmlequal?(Document.new(xml_desc).root)
+        @remote_pool = pool
         break
-      rescue Libvirt::ConnectionError
-        # if we couldn't connect for whatever reason, just try the next host
-        next
       end
     end
-  end
 
-  if conn == nil
-    # last ditch effort; if we didn't find any hosts, just use ourselves.
-    # this may or may not work
-    begin
-      conn = Libvirt::open("qemu:///system")
-    rescue
+    if @remote_pool == nil
+      result = node.storagePoolDefineXML(@xml.to_s)
+      raise "Error creating pool: #{result.text}" unless result.status == 0
+      @remote_pool = session.object(:object_id => result.pool)
+      raise "Error finding newly created remote pool." unless @remote_pool
+
+      # we need this because we don't want to "build" LVM pools, which would
+      # destroy existing data
+      if @build_on_start
+        result = @remote_pool.build
+        raise "Error building pool: #{result.text}" unless result.status == 0
+      end
+      @remote_pool_defined = true
     end
-  end
 
-  if conn == nil
-    raise "Could not find a host to scan storage"
-  end
+    # FIXME: I'm not sure.. it seems like there could be other things going on
+    # with the storage pool state.  State can be inactive, building, running
+    # or degraded.  I think some more thought should go here to make sure
+    # we're doing things right in each state.
 
-  return conn
-end
+    if @remote_pool.state == "inactive"
+      # only try to start the pool if it is currently inactive; in all other
+      # states, assume it is already running
+      result = @remote_pool.create
+      raise "Error defining pool: #{result.text}" unless result.status == 0
 
-# The words "pool" and "volume" are ridiculously overloaded in our context.
-# Therefore, the refresh_pool method adopts this convention:
-# phys_db_pool: The underlying physical storage pool, as it is represented in
-#               the database
-# phys_libvirt_pool: The underlying physical storage, as it is represented in
-#                    libvirt
-# lvm_db_pool: The logical storage pool (if it exists), as it is represented
-#              in the database
-# lvm_libvirt_pool: The logical storage pool (if it exists), as it is
-#                   represented in the database
-
-def refresh_pool(task)
-  puts "refresh_pool"
-
-  phys_db_pool = task.storage_pool
-  if phys_db_pool == nil
-    raise "Could not find storage pool"
-  end
+      # Refresh qpid object with new properties.
+      @remote_pool.update
 
-  conn = storage_find_suitable_host(phys_db_pool.hardware_pool)
-
-  begin
-    phys_libvirt_pool = LibvirtPool.factory(phys_db_pool)
-    phys_libvirt_pool.connect(conn)
-
-    begin
-      # OK, the pool is all set.  Add in all of the volumes
-      add_volumes_to_db(phys_db_pool, phys_libvirt_pool)
-
-      phys_db_pool.state = StoragePool::STATE_AVAILABLE
-      phys_db_pool.save!
-
-      # OK, now we've scanned the underlying hardware pool and added the
-      # volumes.  Next we scan for pre-existing LVM volumes
-      logical_xml = conn.discover_storage_pool_sources("logical")
-
-      Document.new(logical_xml).elements.each('sources/source') do |source|
-        vgname = source.elements["name"].text
-
-        begin
-          source.elements.each("device") do |device|
-            byid_device = phys_libvirt_pool.lookup_vol_by_path(device.attributes["path"]).path
-          end
-        rescue
-          # If matching any of the <device> sections in the LVM XML fails
-          # against the storage pool, then it is likely that this is a storage
-          # pool not associated with the one we connected above.  Go on
-          # FIXME: it would be nicer to catch the right exception here, and
-          # fail on other exceptions
-          puts "One of the logical volumes in #{vgname} is not part of the pool of type #{phys_db_pool[:type]} that we are scanning; ignore the previous error!"
-          next
-        end
+      @remote_pool_started = true
+    end
+  end
 
-        # if we make it here, then we were able to resolve all of the devices,
-        # so we know we need to use a new pool
-        lvm_db_pool = LvmStoragePool.find(:first, :conditions =>
-                                          [ "vg_name = ?", vgname ])
-        if lvm_db_pool == nil
-          lvm_db_pool = LvmStoragePool.new
-          lvm_db_pool[:type] = "LvmStoragePool"
-          # set the LVM pool to the same hardware pool as the underlying storage
-          lvm_db_pool.hardware_pool_id = phys_db_pool.hardware_pool_id
-          lvm_db_pool.vg_name = vgname
-          lvm_db_pool.save!
-        end
+  def create_vol(type, name, size, owner, group, mode)
+    @vol_xml = Document.new
+    @vol_xml.add_element("volume", {"type" => type})
+    @vol_xml.root.add_element("name").add_text(name)
+    @vol_xml.root.add_element("capacity", {"unit" => "K"}).add_text(size.to_s)
+    @vol_xml.root.add_element("target")
+    @vol_xml.root.elements["target"].add_element("permissions")
+    @vol_xml.root.elements["target"].elements["permissions"].add_element("owner").add_text(owner)
+    @vol_xml.root.elements["target"].elements["permissions"].add_element("group").add_text(group)
+    @vol_xml.root.elements["target"].elements["permissions"].add_element("mode").add_text(mode)
+  end
 
-        source.elements.each("device") do |device|
-          byid_device = phys_libvirt_pool.lookup_vol_by_path(device.attributes["path"]).path
-          physical_vol = StorageVolume.find(:first, :conditions =>
-                                            [ "path = ?",  byid_device])
-          if physical_vol == nil
-            # Hm. We didn't find the device in the storage volumes already.
-            # something went wrong internally, and we have to bail
-            raise "Storage internal physical volume error"
-          end
-
-          # OK, put the right lvm_pool_id in place
-          physical_vol.lvm_pool_id = lvm_db_pool.id
-          physical_vol.save!
-        end
+  def shutdown
+    if @remote_pool_started
+      result = @remote_pool.destroy
+    end
+    if @remote_pool_defined
+      result = @remote_pool.undefine
+    end
+  end
 
-        lvm_libvirt_pool = LibvirtPool.factory(lvm_db_pool)
-        lvm_libvirt_pool.connect(conn)
+  def xmlequal?(docroot)
+    return false
+  end
 
-        begin
-          add_volumes_to_db(lvm_db_pool, lvm_libvirt_pool, "0744", "0744", "0744")
-        ensure
-          lvm_libvirt_pool.shutdown
-        end
-      end
-    ensure
-      phys_libvirt_pool.shutdown
+  def self.factory(pool)
+    if pool[:type] == "IscsiStoragePool"
+      return IscsiLibvirtPool.new(pool.ip_addr, pool[:target])
+    elsif pool[:type] == "NfsStoragePool"
+      return NFSLibvirtPool.new(pool.ip_addr, pool.export_path)
+    elsif pool[:type] == "LvmStoragePool"
+      # OK, if this is LVM storage, there are two cases we need to care about:
+      # 1) this is a LUN with LVM already on it.  In this case, all we need to
+      #  do is to create a new LV (== libvirt volume), and be done with it
+      # 2) this LUN is blank, so there is no LVM on it already.  In this
+      #  case, we need to pvcreate, vgcreate first (== libvirt pool build),
+      #  and *then* create the new LV (== libvirt volume) on top of that.
+      #
+      # We can tell the difference between an LVM Pool that exists and one
+      # that needs to be created based on the value of the pool.state;
+      # if it is PENDING_SETUP, we need to create it first
+      phys_volume = StorageVolume.find(:first, :conditions =>
+                                       [ "lvm_pool_id = ?", pool.id])
+      return LVMLibvirtPool.new(pool.vg_name, phys_volume.path,
+                                pool.state == StoragePool::STATE_PENDING_SETUP)
+    else
+      raise "Unknown storage pool type " + pool[:type].to_s
     end
-  ensure
-    conn.close
   end
 end
 
-def create_volume(task)
-  puts "create_volume"
+class IscsiLibvirtPool < LibvirtPool
+  def initialize(ip_addr, target)
+    super('iscsi')
+
+    @type = 'iscsi'
+    @ipaddr = ip_addr
+    @target = target
 
-  db_volume = task.storage_volume
-  if db_volume == nil
-    raise "Could not find storage volume to create"
+    @xml.root.elements["source"].add_element("host", {"name" => @ipaddr})
+    @xml.root.elements["source"].add_element("device", {"path" => @target})
+
+    @xml.root.elements["target"].elements["path"].text = "/dev/disk/by-id"
   end
 
-  db_pool = db_volume.storage_pool
-  if db_pool == nil
-    raise "Could not find storage pool"
+  def xmlequal?(docroot)
+    return (docroot.attributes['type'] == @type and
+        docroot.elements['source'].elements['host'].attributes['name'] == @ipaddr and
+        docroot.elements['source'].elements['device'].attributes['path'] == @target)
   end
+end
 
-  conn = storage_find_suitable_host(db_pool.hardware_pool)
+class NFSLibvirtPool < LibvirtPool
+  def initialize(ip_addr, export_path)
+    super('netfs')
 
-  begin
-    if db_volume[:type] == "LvmStorageVolume"
-      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
-      phys_libvirt_pool.connect(conn)
-    end
+    @type = 'netfs'
+    @host = ip_addr
+    @remote_path = export_path
 
-    begin
-      libvirt_pool = LibvirtPool.factory(db_pool)
+    @xml.root.elements["source"].add_element("host", {"name" => @host})
+    @xml.root.elements["source"].add_element("dir", {"path" => @remote_path})
+    @xml.root.elements["source"].add_element("format", {"type" => "nfs"})
 
-      begin
-        libvirt_pool.connect(conn)
+    @xml.root.elements["target"].elements["path"].text = "/mnt/" + @name
+  end
 
-        libvirt_pool.create_vol(*db_volume.volume_create_params)
-        db_volume.state = StorageVolume::STATE_AVAILABLE
-        db_volume.save!
+  def create_vol(name, size, owner, group, mode)
+    # FIXME: this can actually take some time to complete (since we aren't
+    # doing sparse allocations at the moment).  During that time, whichever
+    # libvirtd we chose to use is completely hung up.  The solution is 3-fold:
+    # 1.  Allow sparse allocations in the WUI front-end
+    # 2.  Make libvirtd multi-threaded
+    # 3.  Make taskomatic multi-threaded
+    super("netfs", name, size, owner, group, mode)
+
+    # FIXME: we have to add the format as raw here because of a bug in libvirt;
+    # if you specify a volume with no format, it will crash libvirtd
+    @vol_xml.root.elements["target"].add_element("format", {"type" => "raw"})
+    result = @remote_pool.createVolumeXML(@vol_xml.to_s)
+    raise "Error creating remote pool: #{result.text}" unless result.status == 0
+    return result.volume
+  end
 
-        db_pool.state = StoragePool::STATE_AVAILABLE
-        db_pool.save!
-      ensure
-        libvirt_pool.shutdown
-      end
-    ensure
-      if db_volume[:type] == "LvmStorageVolume"
-        phys_libvirt_pool.shutdown
-      end
-    end
-  ensure
-    conn.close
+  def xmlequal?(docroot)
+    return (docroot.attributes['type'] == @type and
+        docroot.elements['source'].elements['host'].attributes['name'] == @host and
+        docroot.elements['source'].elements['dir'].attributes['path'] == @remote_path)
   end
 end
 
-def delete_volume(task)
-  puts "delete_volume"
+class LVMLibvirtPool < LibvirtPool
+  def initialize(vg_name, device, build_on_start)
+    super('logical', vg_name)
 
-  db_volume = task.storage_volume
-  if db_volume == nil
-    raise "Could not find storage volume to create"
-  end
+    @type = 'logical'
+    @build_on_start = build_on_start
 
-  db_pool = db_volume.storage_pool
-  if db_pool == nil
-    raise "Could not find storage pool"
+    @xml.root.elements["source"].add_element("name").add_text(@name)
+    @xml.root.elements["source"].add_element("device", {"path" => device})
+    @xml.root.elements["target"].elements["path"].text = "/dev/" + @name
   end
 
-  conn = storage_find_suitable_host(db_pool.hardware_pool)
-
-  begin
-    if db_volume[:type] == "LvmStorageVolume"
-      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
-      phys_libvirt_pool.connect(conn)
-    end
+  def create_vol(name, size, owner, group, mode)
+    super("logical", name, size, owner, group, mode)
+    result = @remote_pool.createVolumeXML(@vol_xml.to_s)
+    raise "Error creating remote pool: #{result.text}" unless result.status == 0
+    return result.volume
+  end
 
-    begin
-      libvirt_pool = LibvirtPool.factory(db_pool)
-      libvirt_pool.connect(conn)
-
-      begin
-        libvirt_volume = libvirt_pool.lookup_vol_by_name(db_volume.read_attribute(db_volume.volume_name))
-        # FIXME: we actually probably want to zero out the whole volume here, so
-        # we aren't potentially leaking data from one user to another.  There
-        # are two problems, though:
-        # 1)  I'm not sure how I would go about zero'ing the data on a remote
-        # machine, since there is no "libvirt_write_data" call
-        # 2)  This could potentially take quite a while, so we want to spawn
-        # off another thread to do it
-        libvirt_volume.delete
-
-        # Note: we have to nil out the task_target because when we delete the
-        # volume object, that also deletes all dependent tasks (including this
-        # one), which leads to accessing stale tasks.  Orphan the task, then
-        # delete the object; we can clean up orphans later (or not, depending
-        # on the audit policy)
-        task.task_target = nil
-        task.save!
-
-        db_volume.destroy
-      ensure
-        libvirt_pool.shutdown
-      end
-    ensure
-      if db_volume[:type] == "LvmStorageVolume"
-        phys_libvirt_pool.shutdown
-      end
-    end
-  ensure
-    conn.close
+  def xmlequal?(docroot)
+    return (docroot.attributes['type'] == @type and
+        docroot.elements['name'].text == @name and
+        docroot.elements['source'].elements['name'] and
+        docroot.elements['source'].elements['name'].text == @name)
   end
 end
+
diff --git a/src/task-omatic/task_vm.rb b/src/task-omatic/task_vm.rb
index cbeda20..74cf862 100644
--- a/src/task-omatic/task_vm.rb
+++ b/src/task-omatic/task_vm.rb
@@ -19,35 +19,10 @@
 require 'rexml/document'
 include REXML
 
-require 'utils'
-
 gem 'cobbler'
 require 'cobbler'
 
-def findHostSLA(vm)
-  host = nil
-
-  vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr|
-    # FIXME: we probably need to add in some notion of "load" into this check
-    if curr.num_cpus >= vm.num_vcpus_allocated \
-      and curr.memory >= vm.memory_allocated \
-      and not curr.is_disabled.nil? and curr.is_disabled == 0 \
-      and curr.state == Host::STATE_AVAILABLE \
-      and (vm.host_id.nil? or (not vm.host_id.nil? and vm.host_id != curr.id))
-      host = curr
-      break
-    end
-  end
-
-  if host == nil
-    # we couldn't find a host that matches this criteria
-    raise "No host matching VM parameters could be found"
-  end
-
-  return host
-end
-
-def findHost(host_id)
+def find_host(host_id)
   host = Host.find(:first, :conditions => [ "id = ?", host_id])
 
   if host == nil
@@ -58,75 +33,6 @@ def findHost(host_id)
   return host
 end
 
-def connect_storage_pools(conn, storage_volumes)
-  storagedevs = []
-  storage_volumes.each do |volume|
-    # here, we need to iterate through each volume and possibly attach it
-    # to the host we are going to be using
-    db_pool = volume.storage_pool
-    if db_pool == nil
-      # Hum.  Specified by the VM description, but not in the storage pool?
-      # continue on and hope for the best
-      puts "Couldn't find pool for volume #{volume.path}; skipping"
-      next
-    end
-
-    # we have to special case LVM pools.  In that case, we need to first
-    # activate the underlying physical device, and then do the logical one
-    if volume[:type] == "LvmStorageVolume"
-      phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(volume)
-      phys_libvirt_pool.connect(conn)
-    end
-
-    libvirt_pool = LibvirtPool.factory(db_pool)
-    libvirt_pool.connect(conn)
-
-    # OK, the pool should be all set.  The last thing we need to do is get
-    # the path based on the volume name
-    storagedevs << libvirt_pool.lookup_vol_by_name(volume.read_attribute(volume.volume_name)).path
-  end
-
-  return storagedevs
-end
-
-def remove_pools(conn, type = nil)
-  all_storage_pools(conn).each do |remote_pool_name|
-    pool = conn.lookup_storage_pool_by_name(remote_pool_name)
-
-    if type == nil or type == Document.new(pool.xml_desc).root.attributes['type']
-      begin
-        pool.destroy
-      rescue
-      end
-
-      begin
-        # if the destroy failed, we still try to undefine; it may be a pool
-        # that was previously destroyed but not undefined for whatever reason
-        pool.undefine
-      rescue
-        # do nothing if any of this failed; the worst that happens is that
-        # we leave a pool configured
-        puts "Could not teardown pool " + remote_pool_name + "; skipping"
-      end
-    end
-  end
-end
-
-def teardown_storage_pools(conn)
-  # FIXME: this needs to get a *lot* smarter.  In particular, we want to make
-  # sure we can tear down unused pools even when there are other guests running
-  if conn.list_domains.empty?
-    # OK, there are no running guests on this host anymore.  We can teardown
-    # any storage pools that are there without fear
-
-    # we first have to tear-down LVM pools, because they might depend on the
-    # underlying physical pools
-    remove_pools(conn, "logical")
-
-    # now tear down the rest of the pools
-    remove_pools(conn)
-  end
-end
 
 def create_vm_xml(name, uuid, memAllocated, memUsed, vcpus, bootDevice,
                   macAddr, bridge, diskDevices)
@@ -195,13 +101,14 @@ def create_vm_xml(name, uuid, memAllocated, memUsed, vcpus, bootDevice,
   return doc
 end
 
-def setVmState(vm, state)
+def set_vm_state(vm, state)
+  vm.reload
   vm.state = state
   vm.save!
 end
 
-def setVmVncPort(vm, domain)
-  doc = REXML::Document.new(domain.xml_desc)
+def set_vm_vnc_port(vm, xml_desc)
+  doc = REXML::Document.new(xml_desc)
   attrib = REXML::XPath.match(doc, "//graphics/@port")
   if not attrib.empty?:
     vm.vnc_port = attrib.to_s.to_i
@@ -209,32 +116,22 @@ def setVmVncPort(vm, domain)
   vm.save!
 end
 
-def findVM(task, fail_on_nil_host_id = true)
+def find_vm(task, fail_on_nil_host_id = true)
   # find the matching VM in the vms table
   vm = task.vm
 
   if vm == nil
-    raise "VM not found for task " + task.id
+    raise "VM #{task.vm} not found for task #{task.id}"
   end
 
   if vm.host_id == nil && fail_on_nil_host_id
-    # in this case, we have no idea where the VM is.  How can we handle this
-    # gracefully?  We don't necessarily want to just set the VM state to off;
-    # if the machine does happen to be running somewhere and we set it to
-    # disabled here, and then start it again, we could corrupt the disk
-
-    # FIXME: the right thing to do here is probably to contact all of the
-    # hosts we know about and ensure that the domain isn't running; then we
-    # can mark it either as off (if we didn't find it), or mark the correct
-    # vm.host_id if we did.  However, if you have a large number of hosts
-    # out there, this could take a while.
     raise "No host_id for VM " + vm.id.to_s
   end
 
   return vm
 end
 
-def setVmShutdown(vm)
+def set_vm_shut_down(vm)
   vm.host_id = nil
   vm.memory_used = nil
   vm.num_vcpus_used = nil
@@ -244,459 +141,3 @@ def setVmShutdown(vm)
   vm.save!
 end
 
-def create_vm(task)
-  puts "create_vm"
-
-  vm = findVM(task, false)
-
-  if vm.state != Vm::STATE_PENDING
-    raise "VM not pending"
-  end
-  setVmState(vm, Vm::STATE_CREATING)
-
-  # create cobbler system profile
-  begin
-    # FIXME: Presently the wui handles all cobbler system creation.
-    # This should be moved out of the wui into Taskomatic.  Specifically
-    # here, and in the edit_vm methods.
-
-    setVmState(vm, Vm::STATE_STOPPED)
-  rescue Exception => error
-    setVmState(vm, Vm::STATE_CREATE_FAILED)
-    raise "Unable to create system: #{error.message}"
-  end
-end
-
-def shut_or_destroy_vm(task, which)
-  # here, we are given an id for a VM to shutdown; we have to lookup which
-  # physical host it is running on
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_STOPPED
-    # the VM is already shutdown; just return success
-    setVmShutdown(vm)
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot shutdown suspended domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot shutdown saved domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_STOPPING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.send(which)
-
-    begin
-      dom.undefine
-    rescue
-      # undefine can fail, for instance, if we live migrated from A -> B, and
-      # then we are shutting down the VM on B (because it only has "transient"
-      # XML).  Therefore, just ignore undefine errors so we do the rest
-      # FIXME: we really should have a marker in the database somehow so that
-      # we can tell if this domain was migrated; that way, we can tell the
-      # difference between a real undefine failure and one because of migration
-    end
-
-    teardown_storage_pools(conn)
-
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmShutdown(vm)
-end
-
-def shutdown_vm(task)
-  puts "shutdown_vm"
-  shut_or_destroy_vm(task, "shutdown")
-end
-
-def poweroff_vm(task)
-  puts "poweroff_vm"
-  shut_or_destroy_vm(task, "destroy")
-end
-
-def start_vm(task)
-  puts "start_vm"
-
-  # here, we are given an id for a VM to start
-
-  vm = findVM(task, false)
-
-  if vm.state == Vm::STATE_RUNNING
-    # the VM is already running; just return success
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot start suspended domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot start saved domain"
-  end
-
-  # FIXME: Validate that the VM is still within quota
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_STARTING)
-
-  begin
-    if vm.host_id != nil
-      # OK, marked in the database as already running on a host; for now, we
-      # will just fail the operation
-
-      # FIXME: we probably want to go out to the host it is marked on and check
-      # things out, just to make sure things are consistent
-      raise "VM already running"
-    end
-
-    # OK, now that we found the VM, go looking in the hardware_pool
-    # hosts to see if there is a host that will fit these constraints
-    host = findHostSLA(vm)
-
-    # if we're booting from a CDROM the VM is an image,
-    # then we need to add the NFS mount as a storage volume for this
-    # boot
-    #
-    if (vm.boot_device == Vm::BOOT_DEV_CDROM) && vm.uses_cobbler? && (vm.cobbler_type == Vm::IMAGE_PREFIX)
-      details = Cobbler::Image.find_one(vm.cobbler_name)
-
-      raise "Image #{vm.cobbler_name} not found in Cobbler server" unless details
-
-      # extract the components of the image filename
-      image_uri = details.file
-      protocol = auth = ip_addr = export_path = filename = ""
-
-      protocol, image_uri = image_uri.split("://") if image_uri.include?("://")
-      auth, image_uri = image_uri.split("@") if image_uri.include?("@")
-      # it's ugly, but string.split returns an empty string as the first
-      # result here, so we'll just ignore it
-      ignored, ip_addr, image_uri =
-	image_uri.split(/^([^\/]+)(\/.*)/) unless image_uri =~ /^\//
-      ignored, export_path, filename =
-	image_uri.split(/^(.*)\/(.+)/)
-
-      found = false
-
-      vm.storage_volumes.each do |volume|
-        if volume.filename == filename
-          if (volume.storage_pool.ip_addr == ip_addr) &&
-          (volume.storage_pool.export_path == export_path)
-            found = true
-          end
-        end
-      end
-
-      unless found
-        # Create a new transient NFS storage volume
-        # This volume is *not* persisted.
-        image_volume = StorageVolume.factory("NFS",
-          :filename => filename
-        )
-
-        image_volume.storage_pool
-        image_pool = StoragePool.factory(StoragePool::NFS)
-
-        image_pool.ip_addr = ip_addr
-        image_pool.export_path = export_path
-        image_pool.storage_volumes << image_volume
-        image_volume.storage_pool = image_pool
-      end
-    end
-
-    volumes = []
-    volumes += vm.storage_volumes
-    volumes << image_volume if image_volume
-
-    conn = Libvirt::open("qemu+tcp://" + host.hostname + "/system")
-
-    begin
-      storagedevs = connect_storage_pools(conn, volumes)
-
-      dom = nil
-      begin
-        # FIXME: get rid of the hardcoded bridge
-        xml = create_vm_xml(vm.description, vm.uuid, vm.memory_allocated,
-                            vm.memory_used, vm.num_vcpus_allocated,
-                            vm.boot_device, vm.vnic_mac_addr, "ovirtbr0",
-                            storagedevs)
-        dom = conn.define_domain_xml(xml.to_s)
-        dom.create
-
-        setVmVncPort(vm, dom)
-      rescue
-        if dom != nil
-          dom.undefine
-        end
-        teardown_storage_pools(conn)
-        raise ex
-      end
-    ensure
-      conn.close
-    end
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  vm.host_id = host.id
-  vm.state = Vm::STATE_RUNNING
-  vm.memory_used = vm.memory_allocated
-  vm.num_vcpus_used = vm.num_vcpus_allocated
-  vm.boot_device = Vm::BOOT_DEV_HD
-  vm.save!
-end
-
-def save_vm(task)
-  puts "save_vm"
-
-  # here, we are given an id for a VM to suspend
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_SAVED
-    # the VM is already saved; just return success
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot save suspended domain"
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot save shutdown domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_SAVING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.save("/tmp/" + vm.uuid + ".save")
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  # note that we do *not* reset the host_id here, since we stored the saved
-  # vm state information locally.  restore_vm will pick it up from here
-
-  # FIXME: it would be much nicer to be able to save the VM and remove the
-  # the host_id and undefine the XML; that way we could resume it on another
-  # host later.  This can be done once we have the storage APIs, but it will
-  # need more work
-
-  setVmState(vm, Vm::STATE_SAVED)
-end
-
-def restore_vm(task)
-  puts "restore_vm"
-
-  # here, we are given an id for a VM to start
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_RUNNING
-    # the VM is already saved; just return success
-    return
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot restore suspended domain"
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot restore shutdown domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_RESTORING)
-
-  begin
-    # FIXME: we should probably go out to the host and check what it thinks
-    # the state is
-
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.restore
-
-    setVmVncPort(vm, dom)
-
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmState(vm, Vm::STATE_RUNNING)
-end
-
-def suspend_vm(task)
-  puts "suspend_vm"
-
-  # here, we are given an id for a VM to suspend; we have to lookup which
-  # physical host it is running on
-
-  vm = findVM(task)
-
-  if vm.state == Vm::STATE_SUSPENDED
-    # the VM is already suspended; just return success
-    return
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot suspend stopped domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot suspend saved domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_SUSPENDING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.suspend
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  # note that we do *not* reset the host_id here, since we just suspended the VM
-  # resume_vm will pick it up from here
-
-  setVmState(vm, Vm::STATE_SUSPENDED)
-end
-
-def resume_vm(task)
-  puts "resume_vm"
-
-  # here, we are given an id for a VM to start
-
-  vm = findVM(task)
-
-  # OK, marked in the database as already running on a host; let's check it
-
-  if vm.state == Vm::STATE_RUNNING
-    # the VM is already suspended; just return success
-    return
-  elsif vm.state == Vm::STATE_STOPPED
-    raise "Cannot resume stopped domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot resume suspended domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_RESUMING)
-
-  begin
-    conn = Libvirt::open("qemu+tcp://" + vm.host.hostname + "/system")
-    dom = conn.lookup_domain_by_uuid(vm.uuid)
-    dom.resume
-    conn.close
-  rescue => ex
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmState(vm, Vm::STATE_RUNNING)
-end
-
-def update_state_vm(task)
-  puts "update_state_vm"
-
-  # NOTE: findVM() will only return a vm if all the host information is filled
-  # in.  So if a vm that we thought was stopped is running, this returns nil
-  # and we don't update any information about it.  The tricky part
-  # is that we're still not sure what to do in this case :).  - Ian
-  #
-  # Actually for migration it is necessary that it be able to update
-  # the host and state of the VM once it is migrated.
-  vm = findVM(task, false)
-  new_vm_state, host_id_str = task.args.split(",")
-  if (vm.host_id == nil) and host_id_str
-    vm.host_id = host_id_str.to_i
-  end
-
-
-  vm_effective_state = Vm::EFFECTIVE_STATE[vm.state]
-  task_effective_state = Vm::EFFECTIVE_STATE[new_vm_state]
-
-  if vm_effective_state != task_effective_state
-    vm.state = new_vm_state
-
-    if task_effective_state == Vm::STATE_STOPPED
-      setVmShutdown(vm)
-    end
-    vm.save!
-    puts "Updated state to " + new_vm_state
-  end
-end
-
-def migrate(vm, dest = nil)
-  if vm.state == Vm::STATE_STOPPED
-    raise "Cannot migrate stopped domain"
-  elsif vm.state == Vm::STATE_SUSPENDED
-    raise "Cannot migrate suspended domain"
-  elsif vm.state == Vm::STATE_SAVED
-    raise "Cannot migrate saved domain"
-  end
-
-  vm_orig_state = vm.state
-  setVmState(vm, Vm::STATE_MIGRATING)
-
-  begin
-    src_host = findHost(vm.host_id)
-    unless dest.nil? or dest.empty?
-      if dest.to_i == vm.host_id
-        raise "Cannot migrate from host " + src_host.hostname + " to itself!"
-      end
-      dst_host = findHost(dest.to_i)
-    else
-      dst_host = findHostSLA(vm)
-    end
-
-    src_conn = Libvirt::open("qemu+tcp://" + src_host.hostname + "/system")
-    dst_conn = Libvirt::open("qemu+tcp://" + dst_host.hostname + "/system")
-
-    connect_storage_pools(dst_conn, vm.storage_volumes)
-
-    dom = src_conn.lookup_domain_by_uuid(vm.uuid)
-    dom.migrate(dst_conn, Libvirt::Domain::MIGRATE_LIVE)
-
-    # if we didn't raise an exception, then the migration was successful.  We
-    # still have a pointer to the now-shutdown domain on the source side, so
-    # undefine it
-    begin
-      dom.undefine
-    rescue
-      # undefine can fail, for instance, if we live migrated from A -> B, and
-      # then we are shutting down the VM on B (because it only has "transient"
-      # XML).  Therefore, just ignore undefine errors so we do the rest
-      # FIXME: we really should have a marker in the database somehow so that
-      # we can tell if this domain was migrated; that way, we can tell the
-      # difference between a real undefine failure and one because of migration
-    end
-
-    teardown_storage_pools(src_conn)
-    dst_conn.close
-    src_conn.close
-  rescue => ex
-    # FIXME: ug.  We may have open connections that we need to close; not
-    # sure how to handle that
-    setVmState(vm, vm_orig_state)
-    raise ex
-  end
-
-  setVmState(vm, Vm::STATE_RUNNING)
-  vm.host_id = dst_host.id
-  vm.save!
-end
-
-def migrate_vm(task)
-  puts "migrate_vm"
-
-  # here, we are given an id for a VM to migrate; we have to lookup which
-  # physical host it is running on
-
-  vm = findVM(task)
-
-  migrate(vm, task.args)
-end
diff --git a/src/task-omatic/taskomatic.rb b/src/task-omatic/taskomatic.rb
index ce37058..19bd4f0 100755
--- a/src/task-omatic/taskomatic.rb
+++ b/src/task-omatic/taskomatic.rb
@@ -1,7 +1,8 @@
 #!/usr/bin/ruby
-# 
+#
 # Copyright (C) 2008 Red Hat, Inc.
-# Written by Chris Lalancette <clalance at redhat.com>
+# Written by Chris Lalancette <clalance at redhat.com> and
+# Ian Main <imain at redhat.com>
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -22,122 +23,748 @@ $: << File.join(File.dirname(__FILE__), "../dutils")
 $: << File.join(File.dirname(__FILE__), ".")
 
 require 'rubygems'
+require 'qpid'
+require 'monitor'
+require 'dutils'
 require 'optparse'
 require 'daemons'
 include Daemonize
 
-$logfile = '/var/log/ovirt-server/taskomatic.log'
+require 'task_vm'
+require 'task_storage'
+
+class TaskOmatic
+
+  include MonitorMixin
+
+  $logfile = '/var/log/ovirt-server/taskomatic.log'
+
+  def initialize()
+    super()
 
-do_daemon = true
-sleeptime = 5
-opts = OptionParser.new do |opts|
-  opts.on("-h", "--help", "Print help message") do
-    puts opts
-    exit
+    @sleeptime = 2
+    @nth_host = 0
+
+    @session = Qpid::Qmf::Session.new()
+    # FIXME: Should come from some kind of config or DNS SRV or what have you.
+    @broker = @session.add_broker("amqp://localhost:5672")
+
+    do_daemon = true
+
+    opts = OptionParser.new do |opts|
+      opts.on("-h", "--help", "Print help message") do
+        puts opts
+        exit
+      end
+      opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
+        do_daemon = false
+      end
+      opts.on("-s N", Integer, "--sleep",
+              "Seconds to sleep between iterations (default is 5 seconds)") do |s|
+        sleeptime = s
+      end
+    end
+    begin
+      opts.parse!(ARGV)
+    rescue OptionParser::InvalidOption
+      puts opts
+      exit
+    end
+
+    if do_daemon
+      # This gets around a problem with paths for the database stuff.
+      # Normally daemonize would chdir to / but the paths for the database
+      # stuff are relative so it breaks it.. It's either this or rearrange
+      # things so the db stuff is included after daemonizing.
+      pwd = Dir.pwd
+      daemonize
+      Dir.chdir(pwd)
+      lf = open($logfile, 'a')
+      $stdout = lf
+      $stderr = lf
+    end
   end
-  opts.on("-n", "--nodaemon", "Run interactively (useful for debugging)") do |n|
-    do_daemon = !n
+
+  def find_capable_host(db_vm)
+    possible_hosts = []
+
+    # FIXME: There may be a bug here in that a host that's already running the
+    # vm won't be returned.  I think that's supposed to be for migration
+    # but it could break creation of VMs in certain conditions..
+
+    vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+
+    db_vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr|
+      # Now each of 'curr' is in the right hardware pool..
+      # now we check them out.
+
+      node = @session.object(:class => "node", 'hostname' => curr.hostname)
+      next unless node
+
+      # So now we expect if the node was found it's alive and well, then
+      # we check to make sure there's enough real cores for the number of
+      # vcpus, the node memory is adequate, the node is not disabled in the
+      # database, and if the node id is nil or if it is already running
+      # (has a node id set) then it is probably looking to migrate so we
+      # find a node that is not the current node.
+      #
+      # In the future we could add load or similar checks here.
+
+      #puts "checking node, #{node.cores} >= #{db_vm.num_vcpus_allocated},"
+      #puts "and #{node.memory} >= #{db_vm.memory_allocated}"
+      #puts "and not #{curr.is_disabled.nil?} and #{curr.is_disabled == 0}"
+      #puts "and #{vm ? vm : 'nil'} or #{vm ? vm.active : 'nil'}) or #{vm ? vm.node : 'nil'} != #{node.object_id}"
+
+      if node and node.cores >= db_vm.num_vcpus_allocated \
+         and node.memory >= db_vm.memory_allocated \
+         and not curr.is_disabled.nil? and curr.is_disabled == 0 \
+         and ((!vm or vm.active == 'false') or vm.node != node.object_id)
+        possible_hosts.push(curr)
+      end
+    end
+
+    #puts "possible_hosts.length = #{possible_hosts.length}"
+    if possible_hosts.length == 0
+      # we couldn't find a host that matches this criteria
+      raise "No host matching VM parameters could be found"
+    end
+
+    # Right now we're just picking the nth host, we could also look at
+    # how many vms are already on it, or the load of the hosts etc.
+    host = possible_hosts[@nth_host % possible_hosts.length]
+    @nth_host += 1
+
+    return host
   end
-  opts.on("-s N", Integer, "--sleep", "Seconds to sleep between iterations (default is 5 seconds)") do |s|
-    sleeptime = s
+
+  def connect_storage_pools(node, storage_volumes)
+    storagedevs = []
+    storage_volumes.each do |db_volume|
+      # here, we need to iterate through each volume and possibly attach it
+      # to the host we are going to be using
+      db_pool = db_volume.storage_pool
+      if db_pool == nil
+        # Hum.  Specified by the VM description, but not in the storage pool?
+        # continue on and hope for the best
+        puts "Couldn't find pool for volume #{db_volume.path}; skipping"
+        next
+      end
+
+      # we have to special case LVM pools.  In that case, we need to first
+      # activate the underlying physical device, and then do the logical one
+      if db_volume[:type] == "LvmStorageVolume"
+        phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
+        phys_libvirt_pool.connect(@session, node)
+      end
+
+      libvirt_pool = LibvirtPool.factory(db_pool)
+      libvirt_pool.connect(@session, node)
+
+      # OK, the pool should be all set.  The last thing we need to do is get
+      # the path based on the volume name
+
+      volume_name = db_volume.read_attribute(db_volume.volume_name)
+      pool = libvirt_pool.remote_pool
+      volume = @session.object(:class => 'volume',
+                               'name' => volume_name,
+                               'storagePool' => pool.object_id)
+      raise "Unable to find volume #{volume_name} attached to pool #{pool.name}." unless volume
+      storagedevs << volume.path
+    end
+
+    return storagedevs
   end
-end
-begin
-  opts.parse!(ARGV)
-rescue OptionParser::InvalidOption
-  puts opts
-  exit
-end
 
-if do_daemon
-  daemonize
-  STDOUT.reopen $logfile, 'a'
-  STDERR.reopen STDOUT
-end
+  def task_create_vm(task)
+    # This is mostly just a place holder.
+    vm = find_vm(task, false)
+    if vm.state != Vm::STATE_PENDING
+      raise "VM not pending"
+    end
+    vm.state = Vm::STATE_STOPPED
+    vm.save!
+  end
 
-begin
-  require 'dutils'
-rescue => ex
-  puts "dutils require failed! #{ex.class}: #{ex.message}"
-end
+  def teardown_storage_pools(node)
+
+    # This is rather silly because we only destroy pools if there are no
+    # more vms on the node.  We should be reference counting the pools
+    # somehow so we know when they are no longer in use.
+    vms = @session.objects(:class => 'domain', 'node' => node.object_id)
+    if vms.length > 0
+      return
+    end
+    pools = @session.objects(:class => 'pool', 'node' => node.object_id)
+
+    # FIXME: We need to undefine LVM volumes first.
+    pools.each do |pool|
+      result = pool.destroy
+      result = pool.undefine
+    end
+  end
+
+
+  def task_shutdown_or_destroy_vm(task, action)
+    db_vm = task.vm
+    vm = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+    if !vm
+      puts "VM already shut down?"
+      return
+    end
+
+    node = @session.object(:object_id => vm.node)
+    raise "Unable to get node that vm is on??" unless node
+
+    if vm.state == "shutdown" or vm.state == "shutoff"
+      set_vm_shut_down(db_vm)
+      return
+    elsif vm.state == "suspended"
+      raise "Cannot shutdown suspended domain"
+    elsif vm.state == "saved"
+      raise "Cannot shutdown saved domain"
+    end
+
+    if action == :shutdown
+      result = vm.shutdown
+      raise "Error shutting down VM: #{result.text}" unless result.status == 0
+    elsif action == :destroy
+      result = vm.destroy
+      raise "Error destroying VM: #{result.text}" unless result.status == 0
+    end
+
+    # undefine can fail, for instance, if we live migrated from A -> B, and
+    # then we are shutting down the VM on B (because it only has "transient"
+    # XML).  Therefore, just ignore undefine errors so we do the rest
+    # FIXME: we really should have a marker in the database somehow so that
+    # we can tell if this domain was migrated; that way, we can tell the
+    # difference between a real undefine failure and one because of migration
+    result = vm.undefine
+    puts "Error undefining VM: #{result.text}" unless result.status == 0
+
+    teardown_storage_pools(node)
+
+    set_vm_shut_down(db_vm)
+  end
+
+  def task_start_vm(task)
+    db_vm = find_vm(task, false)
+
+    # Kinda silly?  I dunno about these intermediate states..
+    set_vm_state(db_vm, Vm::STATE_STARTING)
+
+    vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+
+    if vm
+      case vm.state
+        when "running"
+          return
+        when "blocked"
+          raise "Virtual machine state is blocked, cannot start VM."
+        when "paused"
+          raise "Virtual machine is currently paused, cannot start, must resume."
+      end
+    end
+    db_host = find_capable_host(db_vm)
+
+    node = @session.object(:class => "node", 'hostname' => db_host.hostname)
+
+    raise "Unable to find host #{db_host.hostname} to create VM on." unless node
+
+    image_volume = task_storage_cobbler_setup(db_vm)
+
+    # FIXME: I know this part is broken..
+    #
+    # hrrm, who wrote this comment and why is it broken?  - Ian
+    volumes = []
+    volumes += db_vm.storage_volumes
+    volumes << image_volume if image_volume
+    storagedevs = connect_storage_pools(node, volumes)
+
+    # FIXME: get rid of the hardcoded bridge
+    xml = create_vm_xml(db_vm.description, db_vm.uuid, db_vm.memory_allocated,
+              db_vm.memory_used, db_vm.num_vcpus_allocated, db_vm.boot_device,
+              db_vm.vnic_mac_addr, "ovirtbr0", storagedevs)
+
+    result = node.domainDefineXML(xml.to_s)
+    raise "Error defining virtual machine: #{result.text}" unless result.status == 0
+
+    domain = @session.object(:object_id => result.domain)
+    raise "Cannot find domain on host #{db_host.hostname}, cannot start virtual machine." unless domain
+
+    result = domain.create
+    if result.status != 0
+      domain.undefine
+      raise "Error creating virtual machine: #{result.text}"
+    end
+
+    result = domain.getXMLDesc
+
+    # Reget the db record or you can get 'dirty' errors.  This can happen in a number
+    # of places so you'll see a lot of .reloads.
+    db_vm.reload
+    set_vm_vnc_port(db_vm, result.description) unless result.status != 0
+
+    # This information is not available via the libvirt interface.
+    db_vm.memory_used = db_vm.memory_allocated
+    db_vm.boot_device = Vm::BOOT_DEV_HD
+    db_vm.host_id = db_host.id
+
+    # We write the new state here even though dbomatic will set it soon anyway.
+    # This is just to let the UI know that it's good to go right away and really
+    # dbomatic will just write the same thing over top of it soon enough.
+    db_vm.state = Vm::STATE_RUNNING
+    db_vm.save!
+  end
+
+  def task_suspend_vm(task)
+    db_vm = task.vm
+    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+    raise "Unable to locate VM to suspend" unless dom
+
+    if dom.state != "running" and dom.state != "blocked"
+      raise "Cannot suspend domain in state #{dom.state}"
+    end
+
+    result = dom.suspend
+    raise "Error suspending VM: #{result.text}" unless result.status == 0
+
+    db_vm.reload
+    db_vm.state = Vm::STATE_SUSPENDED
+    db_vm.save!
+  end
+
+  def task_resume_vm(task)
+    db_vm = task.vm
+    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+    raise "Unable to locate VM to resume" unless dom
+
+    if dom.state == "running"
+      # the VM is already suspended; just return success
+      return
+    elsif dom.state != "paused"
+      raise "Cannot suspend domain in state #{dom.state}"
+    end
+
+    result = dom.resume
+    raise "Error resuming VM: #{result.text}" unless result.status == 0
+
+    db_vm.reload
+    db_vm.state = Vm::STATE_RUNNING
+    db_vm.save!
+  end
+
+  def task_save_vm(task)
+
+    # FIXME: This task is actually very broken.  It saves to a local
+    # disk on the node which could be volatile memory, and there is no
+    # differentiation of a 'saved' vm in libvirt which makes it so we
+    # really have no way of knowing when a domain is 'saved'.  We
+    # need to put it on the storage server and mark it in the database
+    # where the image is stored.
+    db_vm = task.vm
+    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+    raise "Unable to locate VM to save" unless dom
+
+    filename = "/tmp/#{dom.uuid}.save"
+    puts "saving vm #{dom.name} to #{filename}"
+    result = dom.save(filename)
+    raise "Error saving VM: #{result.text}" unless result.status == 0
+
+    db_vm.reload
+    set_vm_state(db_vm, Vm::STATE_SAVED)
+  end
+
+  def task_restore_vm(task)
+
+    # FIXME: This is also broken, see task_save_vm FIXME.
+    db_vm = task.vm
+    dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid)
+    raise "Unable to locate VM to restore" unless dom
+
+    filename = "/tmp/#{dom.uuid}.save"
+    puts "restoring vm #{dom.name} from #{filename}"
+    result = dom.restore("/tmp/" + dom.uuid + ".save")
+    raise "Error restoring VM: #{result.text}" unless result.status == 0
+
+    set_vm_state(db_vm, Vm::STATE_RUNNING)
+  end
+
+  def migrate(db_vm, dest = nil)
+
+    vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid)
+    raise "Unable to find VM to migrate" unless vm
+    src_node = @session.object(:object_id => vm.node)
+    raise "Unable to find node that VM is on??" unless src_node
+
+    puts "Migrating domain lookup complete, domain is #{vm}"
+
+    vm_orig_state = db_vm.state
+    set_vm_state(db_vm, Vm::STATE_MIGRATING)
+
+    begin
+      unless dest.nil? or dest.empty?
+        if dest.to_i == db_vm.host_id
+          raise "Cannot migrate from host " + src_node.hostname + " to itself!"
+        end
+        db_dst_host = find_host(dest.to_i)
+      else
+        db_dst_host = find_capable_host(db_vm)
+      end
+
+      dest_node = @session.object(:class => 'node', 'hostname' => db_dst_host.hostname)
+      raise "Unable to find host #{db_dst_host.hostname} to migrate to." unless dest_node
+
+      volumes = []
+      volumes += db_vm.storage_volumes
+      connect_storage_pools(dest_node, volumes)
+
+      # Sadly migrate with qpid is broken because it requires a connection between
+      # both nodes and currently that can't happen securely.  For now we do it
+      # the old fashioned way..
+      src_conn = Libvirt::open("qemu+tcp://" + src_node.hostname + "/system")
+      dst_conn = Libvirt::open("qemu+tcp://" + dest_node.hostname + "/system")
+      dom = src_conn.lookup_domain_by_uuid(vm.uuid)
+      dom.migrate(dst_conn, Libvirt::Domain::MIGRATE_LIVE)
+      src_conn.close
+      dst_conn.close
+
+      # undefine can fail, for instance, if we live migrated from A -> B, and
+      # then we are shutting down the VM on B (because it only has "transient"
+      # XML).  Therefore, just ignore undefine errors so we do the rest
+      # FIXME: we really should have a marker in the database somehow so that
+      # we can tell if this domain was migrated; that way, we can tell the
+      # difference between a real undefine failure and one because of migration
+      result = vm.undefine
+
+      # Note this is just a puts!  Not a raise! :)
+      puts "Error undefining old vm after migrate: #{result.text}" unless result.status == 0
+
+      # See if we can take down storage pools on the src host.
+      teardown_storage_pools(src_node)
+    rescue => ex
+      puts "Error: #{ex}"
+      set_vm_state(db_vm, vm_orig_state)
+      raise ex
+    end
+
+    db_vm.reload
+    db_vm.state = Vm::STATE_RUNNING
+    db_vm.host_id = db_dst_host.id
+    db_vm.save!
+  end
+
+  def task_migrate_vm(task)
+    puts "migrate_vm"
+
+    # here, we are given an id for a VM to migrate; we have to lookup which
+    # physical host it is running on
+    vm = find_vm(task)
+    migrate(vm, task.args)
+  end
+
+  def storage_find_suitable_host(hardware_pool)
+    # find all of the hosts in the same pool as the storage
+    hardware_pool.hosts.each do |host|
+      puts "storage_find_suitable_host: host #{host.hostname} uuid #{host.uuid}"
+      puts "host.is_disabled is #{host.is_disabled}"
+      if host.is_disabled.to_i != 0
+        puts "host #{host.hostname} is disabled"
+        next
+      end
+      node = @session.object(:class => 'node', 'hostname' => host.hostname)
+      return node if node
+    end
+
+    raise "Could not find a host within this storage pool to scan the storage server."
+  end
+
+  def add_volumes_to_db(db_pool, libvirt_pool, owner = nil, group = nil, mode = nil)
+    # FIXME: this is currently broken if you do something like:
+    # 1.  Add an iscsi pool with 3 volumes (lun-1, lun-2, lun-3)
+    # 2.  Scan it in
+    # 3.  Remove lun-3 from the pool
+    # 4.  Re-scan it
+    # What will happen is that you will still have lun-3 available in the
+    # database, even though it's not available in the pool anymore.  It's a
+    # little tricky, though; we have to make sure that we don't pull the
+    # database entry out from underneath a possibly running VM (or do we?)
+    volumes = @session.objects(:class => 'volume',
+                               'storagePool' => libvirt_pool.remote_pool.object_id)
+    volumes.each do |volume|
+      storage_volume = StorageVolume.factory(db_pool.get_type_label)
+
+      # NOTE: it is safe (and, in fact, necessary) to use
+      # #{storage_volume.volume_name} here without sanitizing it.  This is
+      # because this is *not* based on user modifiable data, but rather, on an
+      # internal implementation detail
+      existing_vol = StorageVolume.find(:first, :conditions =>
+                        ["storage_pool_id = ? AND #{storage_volume.volume_name} = ?",
+                        db_pool.id, volume.name])
+
+      # in this case, this path already exists in the database; just skip
+      next if existing_vol
+
+      storage_volume = StorageVolume.factory(db_pool.get_type_label)
+      storage_volume.path = volume.path
+      storage_volume.size = volume.capacity / 1024
+      storage_volume.storage_pool_id = db_pool.id
+      storage_volume.write_attribute(storage_volume.volume_name, volume.name)
+      storage_volume.lv_owner_perms = owner
+      storage_volume.lv_group_perms = group
+      storage_volume.lv_mode_perms = mode
+      storage_volume.state = StorageVolume::STATE_AVAILABLE
+      puts "saving storage volume to db."
+      storage_volume.save!
+    end
+  end
+
+  # The words "pool" and "volume" are ridiculously overloaded in our context.
+  # Therefore, the refresh_pool method adopts this convention:
+  # db_pool_phys: The underlying physical storage pool, as it is represented in
+  #               the database
+  # phys_libvirt_pool: The underlying physical storage, as it is represented in
+  #                    libvirt
+  # db_lvm_pool: The logical storage pool (if it exists), as it is represented
+  #              in the database
+  # lvm_libvirt_pool: The logical storage pool (if it exists), as it is
+  #                   represented in the database
+
+  def task_refresh_pool(task)
+    puts "refresh_pool"
+
+    db_pool_phys = task.storage_pool
+    raise "Could not find storage pool" unless db_pool_phys
+
+    node = storage_find_suitable_host(db_pool_phys.hardware_pool)
+
+    # FIXME: We may want to scan through all the LVM volumes available
+    # and just update the database with allocation information.
+    # However afaict right now libvirt provides no way for us to know
+    # where an LVM pool/volume sits in terms of its physical pool/volume
+    # so we're kinda screwed for now for updating the database.
+    #
+    #   Ian
+    begin
+      phys_libvirt_pool = LibvirtPool.factory(db_pool_phys)
+      phys_libvirt_pool.connect(@session, node)
 
-require 'task_vm'
-require 'task_storage'
-require 'task_host'
-
-loop do
-  tasks = Array.new
-  begin
-    tasks = Task.find(:all, :conditions => [ "state = ?", Task::STATE_QUEUED ])
-  rescue => ex
-    puts "1 #{ex.class}: #{ex.message}"
-    if Task.connected?
       begin
-        ActiveRecord::Base.connection.reconnect!
-      rescue => norecon
-        puts "2 #{norecon.class}: #{norecon.message}"
+        # OK, the pool is all set.  Add in all of the volumes
+        add_volumes_to_db(db_pool_phys, phys_libvirt_pool)
+
+        db_pool_phys.state = StoragePool::STATE_AVAILABLE
+        db_pool_phys.save!
+      end
+    ensure
+      phys_libvirt_pool.shutdown
+    end
+  end
+
+  def task_create_volume(task)
+    puts "create_volume"
+
+    db_volume = task.storage_volume
+    raise "Could not find storage volume to create" unless db_volume
+
+    db_pool = db_volume.storage_pool
+    raise "Could not find storage pool" unless db_pool
+
+    node = storage_find_suitable_host(db_pool.hardware_pool)
+
+    begin
+      if db_volume[:type] == "LvmStorageVolume"
+        phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
+        phys_libvirt_pool.connect(@session, node)
       end
-    else
+
       begin
-        database_connect
-      rescue => ex
-        puts "3 #{ex.class}: #{ex.message}"
+        libvirt_pool = LibvirtPool.factory(db_pool)
+
+        begin
+          libvirt_pool.connect(@session, node)
+          volume_id = libvirt_pool.create_vol(*db_volume.volume_create_params)
+          volume = @session.object(:object_id => volume_id)
+          raise "Unable to find newly created volume" unless volume
+
+          puts "  volume:"
+          for (key, val) in volume.properties
+            puts "    property: #{key}, #{val}"
+          end
+
+          # FIXME: Should have this too I think..
+          #db_volume.key = volume.key
+          db_volume.reload
+          db_volume.path = volume.path
+          db_volume.state = StorageVolume::STATE_AVAILABLE
+          db_volume.save!
+
+          db_pool.reload
+          db_pool.state = StoragePool::STATE_AVAILABLE
+          db_pool.save!
+        ensure
+          libvirt_pool.shutdown
+        end
+      ensure
+        if db_volume[:type] == "LvmStorageVolume"
+          phys_libvirt_pool.shutdown
+        end
       end
     end
   end
-  tasks.each do |task|
-    # make sure we get our credentials up-front
-    get_credentials
 
-    task.time_started = Time.now
-    task.state = Task::STATE_RUNNING
-    task.save!
+  def task_delete_volume(task)
+    puts "delete_volume"
+
+    db_volume = task.storage_volume
+    raise "Could not find storage volume to create" unless db_volume
+
+    db_pool = db_volume.storage_pool
+    raise "Could not find storage pool" unless db_pool
+
+    node = storage_find_suitable_host(db_pool.hardware_pool)
 
-    state = Task::STATE_FINISHED
     begin
-      case task.action
-      when VmTask::ACTION_CREATE_VM then create_vm(task)
-      when VmTask::ACTION_SHUTDOWN_VM then shutdown_vm(task)
-      when VmTask::ACTION_POWEROFF_VM then poweroff_vm(task)
-      when VmTask::ACTION_START_VM then start_vm(task)
-      when VmTask::ACTION_SUSPEND_VM then suspend_vm(task)
-      when VmTask::ACTION_RESUME_VM then resume_vm(task)
-      when VmTask::ACTION_SAVE_VM then save_vm(task)
-      when VmTask::ACTION_RESTORE_VM then restore_vm(task)
-      when VmTask::ACTION_UPDATE_STATE_VM then update_state_vm(task)
-      when VmTask::ACTION_MIGRATE_VM then migrate_vm(task)
-      when StorageTask::ACTION_REFRESH_POOL then refresh_pool(task)
-      when StorageVolumeTask::ACTION_CREATE_VOLUME then create_volume(task)
-      when StorageVolumeTask::ACTION_DELETE_VOLUME then delete_volume(task)
-      when HostTask::ACTION_CLEAR_VMS then clear_vms_host(task)
-      else
-        puts "unknown task " + task.action
-        state = Task::STATE_FAILED
-        task.message = "Unknown task type"
+      if db_volume[:type] == "LvmStorageVolume"
+        phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume)
+        phys_libvirt_pool.connect(@session, node)
+        puts "connected to lvm pool.."
       end
-    rescue => ex
-      puts "Task action processing failed: #{ex.class}: #{ex.message}"
-      puts ex.backtrace
-      state = Task::STATE_FAILED
-      task.message = ex.message
-    end
-
-    task.state = state
-    task.time_ended = Time.now
-    task.save!
-    puts "done"
-  end
-
-  # FIXME: here, we clean up "orphaned" tasks.  These are tasks that we had
-  # to orphan (set task_target to nil) because we were deleting the object they
-  # depended on.
-  Task.find(:all, :conditions => [ "task_target_id IS NULL and task_target_type IS NULL" ]).each do |task|
-    task.destroy
-  end
-  
-  # we could destroy credentials, but another process might be using them (in
-  # particular, host-browser).  Just leave them around, it shouldn't hurt
-  
-  STDOUT.flush
-  sleep sleeptime
+
+      begin
+        libvirt_pool = LibvirtPool.factory(db_pool)
+        libvirt_pool.connect(@session, node)
+
+        begin
+          volume = @session.object(:class => 'volume',
+                                   'storagePool' => libvirt_pool.remote_pool.object_id,
+                                   'path' => db_volume.path)
+          puts "Unable to find volume to delete" unless volume
+
+          # FIXME: we actually probably want to zero out the whole volume
+          # here, so we aren't potentially leaking data from one user
+          # to another.  There are two problems, though:
+          # 1)  I'm not sure how I would go about zero'ing the data on a remote
+          # machine, since there is no "libvirt_write_data" call
+          # 2)  This could potentially take quite a while, so we want to spawn
+          # off another thread to do it
+          # result = volume.delete
+          raise "Error deleting volume: #{result.text}" unless result.status == 0
+
+          # Note: we have to nil out the task_target because when we delete the
+          # volume object, that also deletes all dependent tasks (including this
+          # one), which leads to accessing stale tasks.  Orphan the task, then
+          # delete the object; we can clean up orphans later (or not, depending
+          # on the audit policy)
+          task.reload
+          task.task_target = nil
+          task.save!
+
+          db_volume.destroy
+        ensure
+          libvirt_pool.shutdown
+        end
+      ensure
+        if db_volume[:type] == "LvmStorageVolume"
+          phys_libvirt_pool.shutdown
+        end
+      end
+    end
+  end
+
+  def task_clear_vms_host(task)
+    src_host = task.host
+
+    src_host.vms.each do |vm|
+      migrate(vm)
+    end
+  end
+
+  def mainloop()
+    loop do
+      tasks = Array.new
+      begin
+        tasks = Task.find(:all, :conditions =>
+                          [ "state = ?", Task::STATE_QUEUED ])
+      rescue => ex
+        puts "1 #{ex.class}: #{ex.message}"
+        if Task.connected?
+          begin
+            ActiveRecord::Base.connection.reconnect!
+          rescue => norecon
+            puts "2 #{norecon.class}: #{norecon.message}"
+          end
+        else
+          begin
+            database_connect
+          rescue => ex
+            puts "3 #{ex.class}: #{ex.message}"
+          end
+        end
+      end
+
+      tasks.each do |task|
+        # make sure we get our credentials up-front
+        get_credentials
+
+        task.time_started = Time.now
+
+        state = Task::STATE_FINISHED
+        begin
+          case task.action
+            when VmTask::ACTION_CREATE_VM
+              task_create_vm(task)
+            when VmTask::ACTION_SHUTDOWN_VM
+              task_shutdown_or_destroy_vm(task, :shutdown)
+            when VmTask::ACTION_POWEROFF_VM
+              task_shutdown_or_destroy_vm(task, :destroy)
+            when VmTask::ACTION_START_VM
+              task_start_vm(task)
+            when VmTask::ACTION_SUSPEND_VM
+              task_suspend_vm(task)
+            when VmTask::ACTION_RESUME_VM
+              task_resume_vm(task)
+            when VmTask::ACTION_SAVE_VM
+              task_save_vm(task)
+            when VmTask::ACTION_RESTORE_VM
+              task_restore_vm(task)
+            when VmTask::ACTION_MIGRATE_VM
+              task_migrate_vm(task)
+            when StorageTask::ACTION_REFRESH_POOL
+              task_refresh_pool(task)
+            when StorageVolumeTask::ACTION_CREATE_VOLUME
+              task_create_volume(task)
+            when StorageVolumeTask::ACTION_DELETE_VOLUME
+              task_delete_volume(task)
+            when HostTask::ACTION_CLEAR_VMS: task_clear_vms_host(task)
+          else
+            puts "unknown task " + task.action
+            state = Task::STATE_FAILED
+            task.message = "Unknown task type"
+          end
+        rescue => ex
+          puts "Task action processing failed: #{ex.class}: #{ex.message}"
+          puts ex.backtrace
+          state = Task::STATE_FAILED
+          task.message = ex.message
+        end
+
+        task.state = state
+        task.time_ended = Time.now
+        task.save!
+        puts "done"
+      end
+      # FIXME: here, we clean up "orphaned" tasks.  These are tasks
+      # that we had to orphan (set task_target to nil) because we were
+      # deleting the object they depended on.
+      Task.find(:all, :conditions =>
+                [ "task_target_id IS NULL and task_target_type IS NULL" ]).each do |task|
+        task.destroy
+      end
+      sleep(@sleeptime)
+    end
+  end
 end
+
+taskomatic = TaskOmatic.new()
+taskomatic.mainloop()
+
-- 
1.6.0.4




More information about the ovirt-devel mailing list