Jump to: navigation, search

Difference between revisions of "DistributedTaskManagement"

Line 4: Line 4:
'''Backed with Celery Task Queue: http://www.celeryproject.org'''
'''Backed with Celery Task Queue: http://www.celeryproject.org'''

Revision as of 21:32, 8 January 2016

WIP can be viewed here: https://review.openstack.org/#/c/32036/

Currently part of TaskFlow project: http://www.github.com/stackforge/taskflow

Backed with Celery Task Queue: http://www.celeryproject.org




A task is a thin wrapper over a celery task object. Tasks have a required 'apply' method (what this task will _do_), and an optional 'rollback' method (to be executed on task failure if specified). Each task also has an optional 'provides' and 'requires' section to define, if necessary, what a task requires to execute, and can provide after a successful execution. Tasks can also define a 'retry' limit, to specify how many times this task should retry before it reports an error.


Workflows are a collection of tasks and their relationships. Workflows establish task relationships by registering tasks as listeners to other tasks through the DSTClient.


The DTClient (short for Distributed Task Client) is in charge of registering and notifying task listeners. In the below diagram you can see how the DTClient would wire up listeners for the given workflow.

Exchange Details



1. When a task1 is linked to task2, a request is sent to the DTClient to register task2 as a listener for task1. The Client creates a new exchange on the AMQP Bus (if one does not already exist) for the new notifying task, and registers the listening task as a listener for this exchange.

2. The program executes a workflow, which will trigger all tasks without a 'requires' component to fire. This execution happens by sending the root tasks to the AMQP Bus.

3. Celery workers listening to the AMQP Bus will pick up and execute all tasks posted to the Bus. It is important to note here that the celery works can be hosted on the same machine running the program, or on a separate, dedicated host to reduce processing overhead.

4. If a tasks succeeds or errors, the celery worker operating on that task will broadcast a signal with information describing why the signal was triggered (i.e. error message, successful result data, etc)

5. These signals will then, in turn, send their data up to the DTClient.

6. The DTClient will notify whatever exchange the signal specified (task id from signal) that that task has changed. If all of a listener's 'requires' have been satisfied, the listener will then execute its' callback (which is just your defined task).


Taskflow, by default, does not include some of the tools necessary to run a distributed flow. This is for your convenience, as we try to minimize the number of imports you must use. So, here is a step by step guide to get your distributed environment set up and customized!

1. Install celery

   pip install celery

2. Choose a broker to use. Celery uses kombu, a wrapper over any amqp compliant queue, so you have a lot of choices here. I personally use rabbit as it's fully supported by celery, and I really like its management UI. To see a complete list of supported brokers and how to install them/wire them up with Celery, look here - http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#choosing-a-broker

3. Write your workflow! It must be noted here that there is one special exception that sets distributed apart from traditional flows. Celery Workers only run code that is installed on themselves. Therefore, your task code must be installed on the worker. The easiest convention for doing this is as follows:

   import celery
   from taskflow import task
   def myTask(*args, **kwargs):
       return my_results
   flow_task = task.FunctorTask(execute=myTask)

The celery decorator lets the worker know that this is a method that needs to be pulled in and installed. Alternatively, you can manually install task code onto a worker, but it's much more work than the above described syntax. Since distributed is just a different backend to taskflow, the syntax for creating a flow is the same!

   from taskflow import task
   def mix_ingredients(*args, **kwargs):
       for ingredient in **kwargs:
   def choose_pan(*args, **kwargs):
       return {'selection': pan_selection()}
   def heat_oven(*args, **kwargs):
       pan_selection = kwargs['selection']
       return oven
   def bake(*args, **kwargs):
       for oven in kwargs:
   from taskflow.patterns import graph_flow
   wf = graph_flow.Flow("bake-cake-flow")
   one = task.FunctorTask(execute=mix_ingredients)
   two = task.FunctorTask(execute=choose_pan, provides=['first_pan'])
   three = task.FunctorTask(execute=choose_pan, provides=['second_pan'])
   four = task.FunctorTask(execute=heat_oven, requires=['first_pan'])
   five = task.FunctorTask(execute=heat_oven, requires=['second_pan'])
   six = task.FunctorTask(execute=bake)
   wf.add(one, two, three, four, five, six)
   wf.link(one, two)
   wf.link(one, three)
   wf.link(four, six)
   wf.link(five, six)

And we've made a cake! So, some things to note here. You can either just add tasks to a workflow, or link them together. When a task is just added to a flow, the engine will attempt to determine task relations based on provides/requires data. If you explicitly link two tasks together, the engine no longer cares about their provides/requires relationship. It just knows that in flow.link(1, 2) 2 is to execute as soon as 1 has entered a SUCCESS state.

4. Initialize your distributed engine. Since engines are pluggable backends, a flow needs to specify what engine it'd like to be run on. In addition, we also want to initialize our backend and persistence settings. In the following example I've set up a basic distributed engine that operates against a memory backend, where _flow_ is referring to the bake a cake flow we defined above.

   backend = impl_memory.MemoryBackend({})
   lb = logbook.LogBook("my-log-book")
   fd = utils.create_flow_detail(flow, book=lb, backend=backend)
   storage = task_storage.Storage(fd)
   engine = distributed_engine.DistributedEngine(flow, storage)

5. Kick your Celery workers. So, in order for tasks to be executed, we actually need to have live workers attached to the broker we previously set up. Workers are highly customizable. You can specify how many tasks they are to pick up at a time, where they should run (a dedicated VM, your local machine, across a combination of multiple environments, etc) how to autoscale and many other things. See specifics here - http://docs.celeryproject.org/en/latest/userguide/workers.html. All settings that need to be customized are to be added to taskflow.engines.dist_engine.celeryconfig Once we've customized our celery environment to our liking, we send the signal to start the worker

   celery worker --app=distributed_engine --loglevel=info

Note here as well that the worker must have access to distributed_engine.py in order to hook itself to the celery app. Also, you must let the celery config know what modules contain tasks that you want to pull in. See CELERY_IMPORTS in celeryconfig.py

6. Last thing to do is actually run your flow!


And that's a quick overview. To see complete examples checkout taskflow.tests.unit.test_distributed_engine, and try running some of those examples yourself!



Because you link multiple tasks together before you run a workflow, your initial workflow graph will be defined. Although it isn't necessary (see EVENT DRIVEN), you could perform pre-processing of your workflow (such as checking for cyclical dependencies).


Using a distributed approach, it is possible to create a persistent workflow. As an example, imagine that you want to define a task who's sole purpose is to add an IP address to a load balancer as soon as a new server is popped in a defined cluster. You want your task to persist idly until it is notified that it has a new IP to add. In a traditional workflow approach, this would be impossible, or at least very difficult, to implement, as once a task has completed it will not be re-entrant unless the entire workflow is re-kicked. With our distributed approach, that "adder" task is simply a callback for a listener registered to a queue. It will exist infinitely, and will execute every time it receives a notification to do so.


The event-driven nature of the distributed workflow is perhaps its' greatest feature, as this allows for modification of a workflow during runtime. This is an advantage for a great amount of reasons.

Firstly, this can remove the need for pre-processing of a workflow. If, as a developer, you have a need to quickly write and push out a workflow, you might not be concerned with (or at least don't want to deal with) any sort of pre-processing computation. If you make a mistake in your flow because of this, you can correct your flow mid-run. For instance, if you have an undetected cyclical dependency, you can simply un-register a task, or change a task's listener, and the flow will then automatically move past this error. Also, in a traditional workflow approach, a cyclical dependency would eventually crash your box as you suck up all available resources. In a distributed system, a cyclical flow could continue infinitely, as each resource (memory/process/etc) being used is released as soon as one of your task completes.

Cylical Dependency Fix Mid-Run

Secondly, it might become necessary to change a task mid-run. Imagine a task that pulls down the latest chef recipes for a wordpress install, and installs those recipes on a given server. What happens if, mid-run, that server happens to crash? You don't want to have to shut down and restart your entire workflow. You want to be able to kick a new server, and have your wordpress task point to that new server, and pick up from where it left off (without having to re-pull or re-configure any chef data). Distributed makes it possible to define a new task (create server2), and change our wordpress task to listen to the new server creation. Alternatively, you could set a retry for your failed server task, so that your task can try to execute again before erroring. There might also be a time where you forgot to add a task before runtime, or decided that you need to add a task during runtime. Using our kick server -> install wordpress example, imagine that you either forgot to, or later decided that you wanted to, register a monitor onto your server. Instead of having to shut down your workflow, add this step, then re-start the flow, you could simply create your new add_monitor task, and register it as a listener to kick_server.

The main idea here is that you may have a dynamic environment. Things could change mid-run that you want to be able to handle on the fly without having to stop, tear down, and re-kick an entire workflow. This is not an entirely new idea. Many articles have been written attesting to the importance and value of a task system that can change and adapt while running in an unpredictable environment. (Here's a link to great example of a distributed task system being used in the field of robotics if you have the time or interest to read it http://www.cs.cmu.edu/~mberna/mrrg/papers/p860-ortiz.pdf)