[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [libvirt] Help needed: simple python api to obtain events



On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
> Would someone help me have a shrink-wrapped solution for obtaining libvirt
> events in python?

I decided to re-write the demo program so that is shows a serious
production kwalitee event loop implementation that can be used in
real world applications. I think you'll find this much nicer :-)

I propose we add this example to examples/domain-events/events-py
instead of the code we currently have

Daniel

#!/usr/bin/python -u
#
#
#
#################################################################################
# Start off by implementing a general purpose event loop for anyones use
#################################################################################

import sys
import getopt
import os
import libvirt
import select
import errno
import time
import threading

class virEventLoopPure:
    class virEventLoopPureHandle:
        def __init__(self, handle, fd, events, cb, opaque):
            self.handle = handle
            self.fd = fd
            self.events = events
            self.cb = cb
            self.opaque = opaque

        def get_id(self):
            return self.handle

        def get_fd(self):
            return self.fd

        def get_events(self):
            return self.events

        def set_events(self, events):
            self.events = events

        def dispatch(self, events):
            self.cb(self.handle,
                    self.fd,
                    events,
                    self.opaque[0],
                    self.opaque[1])

    class virEventLoopPureTimer:
        def __init__(self, timer, interval, cb, opaque):
            self.timer = timer
            self.interval = interval
            self.cb = cb
            self.opaque = opaque
            self.lastfired = 0

        def get_id(self):
            return self.timer

        def get_interval(self):
            return self.interval

        def set_interval(self, interval):
            self.interval = interval

        def get_last_fired(self):
            return self.lastfired

        def set_last_fired(self, now):
            self.lastfired = now

        def dispatch(self):
            self.cb(self.timer,
                    self.opaque[0],
                    self.opaque[1])

    def __init__(self, debug=False):
        self.debugOn = debug
        self.poll = select.poll()
        self.pipetrick = os.pipe()
        self.nextHandleID = 1
        self.nextTimerID = 1
        self.handles = []
        self.timers = []
        self.quit = False

        self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1]))
        self.poll.register(self.pipetrick[0], select.POLLIN)

    def debug(self, msg):
        if self.debugOn:
            print msg

    def next_timeout(self):
        next = 0
        for t in self.timers:
            last = t.get_last_fired()
            interval = t.get_interval()
            if interval < 0:
                continue
            if next == 0 or (last + interval) < next:
                next = last + interval

        return next

    def get_handle_by_fd(self, fd):
        for h in self.handles:
            if h.get_fd() == fd:
                return h
        return None

    def get_handle_by_id(self, handleID):
        for h in self.handles:
            if h.get_id() == handleID:
                return h
        return None

    def run_once(self):
        sleep = -1
        next = self.next_timeout()
        self.debug("Next timeout due at %d" % next)
        if next > 0:
            now = int(time.time())
            if now >= next:
                sleep = 0
            else:
                sleep = next - now

        self.debug("Poll with a sleep of %d" % sleep)
        events = self.poll.poll(sleep)

        for (fd, revents) in events:
            if fd == self.pipetrick[0]:
                data = os.read(fd, 1)
                continue

            h = self.get_handle_by_fd(fd)
            if h:
                self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents))
                h.dispatch(self.events_from_poll(revents))

        now = int(time.time())
        for t in self.timers:
            interval = t.get_interval()
            if interval < 0:
                continue

            want = t.get_last_fired() + interval
            # Deduct 20ms, since schedular timeslice
            # means we could be ever so slightly early
            if now >= (want-20):
                self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want)))
                t.set_last_fired(now)
                t.dispatch()

    def run_loop(self):
        self.quit = False
        while not self.quit:
            self.run_once()

    def interrupt(self):
        os.write(self.pipetrick[1], 'c')

    def add_handle(self, fd, events, cb, opaque):
        handleID = self.nextHandleID + 1
        self.nextHandleID = self.nextHandleID + 1

        h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque)
        self.handles.append(h)

        self.poll.register(fd, self.events_to_poll(events))
        self.interrupt()

        self.debug("Add handle %d fd %d events %d" % (handleID, fd, events))

        return handleID

    def add_timer(self, interval, cb, opaque):
        timerID = self.nextTimerID + 1
        self.nextTimerID = self.nextTimerID + 1

        h = self.virEventLoopPureTimer(timerID, interval, cb, opaque)
        self.timers.append(h)
        self.interrupt()

        self.debug("Add timer %d interval %d" % (timerID, interval))

        return timerID

    def update_handle(self, handleID, events):
        h = self.get_handle_by_id(handleID)
        if h:
            h.set_events(events)
            self.poll.unregister(h.get_fd())
            self.poll.register(h.get_fd(), self.events_to_poll(events))
            self.interrupt()

            self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events))

    def update_timer(self, timerID, interval):
        for h in self.timers:
            if h.get_id() == timerID:
                h.set_interval(interval);
                self.interrupt()

                self.debug("Update timer %d interval %d"  % (timerID, interval))
                break

    def remove_handle(self, handleID):
        handles = []
        for h in self.handles:
            if h.get_id() == handleID:
                self.poll.unregister(h.get_fd())
                self.debug("Remove handle %d fd %d" % (handleID, h.get_fd()))
            else:
                handles.append(h)
        self.handles = handles
        self.interrupt()

    def remove_timer(self, timerID):
        timers = []
        for h in self.timers:
            if h.get_id() != timerID:
                timers.append(h)
                self.debug("Remove timer %d" % timerID)
        self.timers = timers
        self.interrupt()

    def events_to_poll(self, events):
        ret = 0
        if events & libvirt.VIR_EVENT_HANDLE_READABLE:
            ret |= select.POLLIN
        if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
            ret |= select.POLLOUT
        if events & libvirt.VIR_EVENT_HANDLE_ERROR:
            ret |= select.POLLERR;
        if events & libvirt.VIR_EVENT_HANDLE_HANGUP:
            ret |= select.POLLHUP;
        return ret

    def events_from_poll(self, events):
        ret = 0;
        if events & select.POLLIN:
            ret |= libvirt.VIR_EVENT_HANDLE_READABLE;
        if events & select.POLLOUT:
            ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE;
        if events & select.POLLNVAL:
            ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
        if events & select.POLLERR:
            ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
        if events & select.POLLHUP:
            ret |= libvirt.VIR_EVENT_HANDLE_HANGUP;
        return ret;


###########################################################################
# Now glue an instance of the general event loop into libvirt's event loop
###########################################################################

eventLoop = virEventLoopPure(debug=False)
eventLoopThread = None

def virEventAddHandleImpl(fd, events, cb, opaque):
    global eventLoop
    return eventLoop.add_handle(fd, events, cb, opaque)

def virEventUpdateHandleImpl(handleID, events):
    global eventLoop
    return eventLoop.update_handle(handleID, events)

def virEventRemoveHandleImpl(handleID):
    global eventLoop
    return eventLoop.remove_handle(handleID)

def virEventAddTimerImpl(interval, cb, opaque):
    global eventLoop
    return eventLoop.add_timer(interval, cb, opaque)

def virEventUpdateTimerImpl(timerID, interval):
    global eventLopo
    return eventLoop.update_timer(timerID, interval)

def virEventRemoveTimerImpl(timerID):
    global eventLoop
    return eventLoop.remove_timer(timerID)

def virEventLoopPureRegister():
    libvirt.virEventRegisterImpl(virEventAddHandleImpl,
                                 virEventUpdateHandleImpl,
                                 virEventRemoveHandleImpl,
                                 virEventAddTimerImpl,
                                 virEventUpdateTimerImpl,
                                 virEventRemoveTimerImpl)

def virEventLoopPureRun():
    global eventLoop
    eventLoop.run_loop()

def virEventLoopPureStart():
    global eventLoopThread
    virEventLoopPureRegister()
    eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop")
    eventLoopThread.setDaemon(True)
    eventLoopThread.start()


##########################################################################
# Everything that now follows is a simple demo of domain lifecycle events
##########################################################################
def eventToString(event):
    eventStrings = ( "Added",
                     "Removed",
                     "Started",
                     "Suspended",
                     "Resumed",
                     "Stopped",
                     "Saved",
                     "Restored" );
    return eventStrings[event];

def myDomainEventCallback1 (conn, dom, event, detail, opaque):
    print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)

def myDomainEventCallback2 (conn, dom, event, detail, opaque):
    print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)

def usage():
        print "usage: "+os.path.basename(sys.argv[0])+" [uri]"
        print "   uri will default to qemu:///system"

def main():
    try:
        opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] )
    except getopt.GetoptError, err:
        # print help information and exit:
        print str(err) # will print something like "option -a not recognized"
        usage()
        sys.exit(2)
    for o, a in opts:
        if o in ("-h", "--help"):
            usage()
            sys.exit()

    if len(sys.argv) > 1:
        uri = sys.argv[1]
    else:
        uri = "qemu:///system"

    print "Using uri:" + uri

    # Run a background thread with the event loop
    virEventLoopPureStart()

    vc = libvirt.open(uri)

    # Close connection on exit (to test cleanup paths)
    old_exitfunc = getattr(sys, 'exitfunc', None)
    def exit():
        print "Closing " + str(vc)
        vc.close()
        if (old_exitfunc): old_exitfunc()
    sys.exitfunc = exit

    #Add 2 callbacks to prove this works with more than just one
    vc.domainEventRegister(myDomainEventCallback1,None)
    vc.domainEventRegister(myDomainEventCallback2,None)

    # The rest of your app would go here normally, but for sake
    # of demo we'll just go to sleep. The other option is to
    # run the event loop in your main thread if your app is
    # totally event based.
    while 1:
        time.sleep(1)


if __name__ == "__main__":
    main()


-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]