[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [Pulp-list] Sanity Check





On 09/23/2010 10:49 AM, Bryan Kearney wrote:
On 09/23/2010 09:58 AM, Jeff Ortel wrote:
All,

I'd like to get a quick sanity check on how I designed the dispatching
of API methods (that rely on RMI on the agent) through the task
subsystem. The approach is simple enough. Hopefully, the long write up
will not make it seem more complicated than it really is :)

First, add classes to pulp.server.async.py that mirror the
pulp.server.agent.Agent class but performs RMI asynchronously by default
and specifies the same correlation tag as the reply listener. The
pulp.server.agent.Agent does synchronous RMI by default and this way,
the replies are guaranteed to match what the reply listener is listening
on.

Second, the ReplyListener is added to pulp.server.async.py. It's job is
to listen for asynchronous RMI replies based on the same correlation tag
specified when the RMI was invoked as described above. When replies are
received, the associated task is updated by calling either
Task.succeeded() or Task.failed(). The task is also updated with the RMI
returned value or exception as appropriate.

Third, I extended the pulp.server.tasking.task.Task as AsyncTask. Unlike
its superclass, it expects its callable to be asynchronous. Further,
when the task runs the callable, the task's state is not advanced in
anticipation that its succeeded() or failed() methods will be called by
an external event.

Last, and most significant, is how the API classes leverage (1-3). The
API methods such as ConsumerApi.installpackages() need to do something
on the agent and then update consumer history if the agent (RMI) was
successful. This is what makes this kind of tricky. So, what I decided
to implement this by subclassing the AsyncTask as
pulp.server.api.consumer.InstallPackages. This specialized task is
created/enqueued and returned by ConsumerApi.installpackages() giving
InstallPackages.install() as the task's target callable. When invoked,
it sends the RMI. Then, when the reply comes in and Task.succeeded() is
called by the queue. This method is overridden in InstallPackages which
calls super.succeeded() and then updates the consumer history.

It looks like this:

WS(controller)-->ConsumerApi.installpackages()-->FIFOQueue.enqueue(task)
[ task runs Task.callable() ]
InstallPackages.install()-->AsyncAgent.packages.install()
[ RMI to agent ]
[ agent reply received on QPID queue ]
ReplyListener.succeeded(reply)
-->FIFOQueue.find(taskid)
-->InstallPackages.succeeded(reply)
-->Super.succeeded(reply)
-->ConsumerHistoryApi.packages_installed()



Is the assumption that the task is stored in either the DB or a user
session?

Currently, tasks are persisted using an in-memory store. Eventually, we need to add a non-volatile store so tasks are preserved across reboot/restart.

Also... how does this patern change if I want to install
packages at noon next tuesday?

The messaging and agent frameworks already support the concept of execution (or maintenance) "windows" for RMI calls. That is, asynchronous RMI with a date/time window specified.

So, we have two choices:

1) Dispatch though the task subsystem as defined here but include a parameter to
   specify the window as part of the RMI call to the agent.

2) Introduce task scheduling into the task subsystem.


-- bk



Attachment: smime.p7s
Description: S/MIME Cryptographic Signature


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]