Jump to: navigation, search

TaskFlow/Patterns and Engines Documentation

< TaskFlow
Revision as of 08:50, 3 September 2013 by Akarpinska (talk | contribs)


Patterns describe the Flow structure. Patterns define an order of execution of subblocks (patterns or task blocks). TaskBlocks describe is a block that describes a Task. Task is a user-defined class that can be executed and reverted.

Task is a block that specifies how to execute a used-defined task, what parameters should be used and what data should be saved.


   class Adder(task.Task):
       def __init__(self, name):
           super(Adder, self).__init__(name)
       def __call__(self, x, y):
           return x+y
       def revert(self, x, y):
           print "Revert adder"
       flow = blocks.Task(Adder("add"), save_as='z')

This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.

LinearFlow pattern runs subblocks sequentially, one after another.


   flow = (blocks.LinearFlow().add(blocks.Task(Adder("add"), save_as='z1'),
                                   blocks.Task(Adder("add"), save_as='z2'),
                                   blocks.Task(Adder("add"), save_as='z3', rebind_args=['z1', 'z2']))

This example describes three tasks. First two adders will receive both 'x' and 'y' parameters and return 'z1' and 'z2'. The third task will receive 'z1' and 'z2' and return 'z3'.

ParallelFlow runs subblocks in parallel and waits for completion.


   flow = (blocks.ParallelFlow().add(blocks.Task(Adder("add"), save_as='z1'),
                                     blocks.Task(Adder("add"), save_as='z2'),
                                     blocks.Task(Adder("add"), save_as='z3'))

This example describes three tasks that will be run in parallel. These tasks accept 'x' and 'y' parameters and return 'z1', 'z2' and 'z3'.

Building the engine

Engine executes a flow. Engine converts flow blocks to actions. The implementation of an action defines how to execute it. Engine accepts a dictionary that specifies what action should be built from each block type. The following example creates a single threaded engine where all tasks will be executed sequentially in a single thread.

   engine = SingleThreadedActionEngine(flow)

Multithreaded engine can be used to run tasks in parallel. This engine uses multiprocessing.pool.ThreadPool by default, but any other thread pool can be set as a parameter.

   engine = MultiThreadedActionEngine(flow, thread_pool=my_pool)


Engine storage contains all data produced by flow. Also initial parameters for the tasks can be saved to the storage before the flow starts.

   engine.storage = {'x': 5, 'y': 7}

By default task receives parameters from storage. Task parameter name is the same as the storage key. But in case if we want to put parameters with other names to the task, the parameters binding should be described. Let an Adder task be an example.

   flow = (blocks.LinearFlow().add(blocks.Task(Adder("add1"), save_as='z1'),
                                   blocks.Task(Adder("add2"), save_as='z2'),
                                   blocks.Task(Adder("add3"), save_as='z3', rebind_args=['z1', 'z2']))

The 'add3' task requires 'x' and 'y' as parameters. But we want to use results from 'add1' and 'add2' instead. The following parameter tells what data should be used.

   rebind_args=['z1', 'z2']

If we want to replace some parameters, but not all, partial mapping can be used. In the following case 'z1' will be set as an 'y' parameter.

   rebind_args={'y': 'z1'}