WIP can be viewed here: https://review.openstack.org/#/c/32036/
Currently part of TaskFlow project: http://www.github.com/stackforge/taskflow
Backed with Celery Task Queue: http://www.celeryproject.org
A task is a thin wrapper over a celery task object. Tasks have a required 'apply' method (what this this task will _do_), and an optional 'rollback' method (to be executed on task failure if specified). Each task also has an optional 'provides' and 'requires' section to define, if necessary, what a task requires to execute, and can provide after a successful execution. Tasks can also define a 'retry' limit, to specify how many times this task should retry before it reports an error.
Workflows are a collection of tasks and their relationships. Workflows establish task relationships by registering tasks as listeners to other tasks through the DSTClient.
The DSTClient (short for Distributed State Task Client) is in charge of registering and notifying task listeners. In the below diagram you can see how the DSTClient would wire up listeners for the given workflow.
HOW IT WORKS
1. When a task1 is linked to task2, a request is sent to the DSTClient to register task2 as a listener for task1. The Client creates a new exchange on the AMQP Bus (if one does not already exist) for the new notifying task, and registers the listening task as a listener for this exchange.
2. The program executes a workflow, which will trigger all tasks without a 'requires' component to fire. This execution happens by sending the root tasks to the AMQP Bus.
3. Celery workers listening to the AMQP Bus will pick up and execute all tasks posted to the Bus. It is important to note here that the celery works can be hosted on the same machine running the program, or on a separate, dedicated host to reduce processing overhead.
4. If a tasks succeeds or errors, the celery worker operating on that task will broadcast a signal with information describing why the signal was triggered (i.e. error message, successful result data, etc)
5. These signals will then, in turn, send their data up to the DSTClient.
6. The DSTClient will notify whatever exchange the signal specified (task id from signal) that that task has changed. If all of a listener's 'requires' have been satisfied, the listener will then execute its' callback (which is just your defined task).
Below is a very basic example of how the distributed task system language/usage.
@task.provides(add) def add(x, y): return x + y
@task.requires(add) def mult(add, y): return add * y
flo = wf('My-distributed-flow') add_task = add.s(1, 2) mult_task = mult.s(3) flo.add(add_task) flo.link(add_task, mult_task) flo.run
Because you link multiple tasks together before you run a workflow, your initial workflow graph will be defined. Although it isn't necessary (see EVENT DRIVEN), you could perform pre-processing of your workflow (such as checking for cyclical dependencies).
Using a distributed approach, it is possible to create a persistent workflow. As an example, imagine that you want to define a task who's sole purpose is to add an IP address to a load balancer as soon as a new server is popped in a defined cluster. You want your task to persist idly until it is notified that it has a new IP to add. In a traditional workflow approach, this would be impossible, or at least very difficult, to implement, as once a task has completed it will not be re-entrant unless the entire workflow is re-kicked. With our distributed approach, that "adder" task is simply a callback for a listener registered to a queue. It will exist infinitely, and will execute every time it receives a notification to do so.
The event-driven nature of the distributed workflow is perhaps its' greatest feature, as this allows for modification of a workflow during runtime. This is an advantage for a great amount of reasons.
Firstly, this can remove the need for pre-processing of a workflow. If, as a developer, you have a need to quickly write and push out a workflow, you might not be concerned with (or at least don't want to deal with) any sort of pre-processing computation. If you make a mistake in your flow because of this, you can correct your flow mid-run. For instance, if you have an undetected cyclical dependency, you can simply un-register a task, or change a task's listener, and the flow will then automatically move past this error.
The main idea here is that you may have a dynamic environment. Things could change mid-run that you want to be able to handle on the fly without having to stop, tear down, and re-kick an entire workflow. This is not an entirely new idea. Many articles have been written attesting to the importance and value of a task system that can change and adapt while running in an unpredictable environment. (Here's a link to great example of a distributed task system being used in the field of robotics if you have the time or interest to read it http://www.cs.cmu.edu/~mberna/mrrg/papers/p860-ortiz.pdf)