Jump to: navigation, search

TaskFlow/Patterns and Engines Documentation

Patterns

Patterns describe the Flow structure. Patterns define an order of execution of subflows (patterns or tasks).


Task is a user-defined class that can be executed and reverted.

Example:

   class Adder(task.Task):
       def __init__(self, name, rebind, provides, requires):
           super(Adder, self).__init__(name, rebind=rebind, provides=provides, requires=requires)
       def __call__(self, x, y):
           return x+y
       def revert(self, x, y):
           print "Revert adder"
       flow = Adder("add", provides='z')

This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.


Storage and Task Parameters

Engine storage contains all data produced by flow. Also initial parameters for the tasks can be saved to the storage before the flow starts.

   engine.storage.inject({'x': 5, 'y': 7})

By default task receives parameters from storage. Task parameter name is the same as the storage key. But in case if we want to put parameters with other names to the task, the parameters binding should be described. Let an Adder task be an example.

   flow = (linear_flow.Flow().add(Adder("add1", provides='z1'),
                                  Adder("add2", provides='z2'),
                                  Adder("add3", provides='z3', rebind=['z1', 'z2']))

The 'add3' task requires 'x' and 'y' as parameters. But we want to use results from 'add1' and 'add2' instead. The following parameter tells what data should be used.

   rebind=['z1', 'z2']

If we want to replace some parameters, but not all, partial mapping can be used. In the following case 'z1' will be set as an 'y' parameter.

   rebind={'y': 'z1'}

To save task result to storage result names should be specified with 'provides' parameter. It can be a string value in case if task produces only one result, or a list in a case of multiple results.

  provides=['a', 'b']
  provides='x'

Additional requirements can be specified with 'requires' parameter.

  requires=['a', 'b']



Linear Flow pattern executes subflows sequentially, one after another.

Example:

   flow = (linear_flow.Flow().add(Adder("add1", provides='z1'),
                                  Adder("add2", provides='z2'),
                                  Adder("add3", provides='z3', rebind=['z1', 'z2']))

This example describes three tasks. First two adders will receive both 'x' and 'y' parameters and return 'z1' and 'z2'. The third task will receive 'z1' and 'z2' and return 'z3'.


Unordered Flow describes a set of independent subflows that can be executed in parallel.

Example:

   flow = (unordered_flow.Flow().add(Adder("add1", provides='z1'),
                                     Adder("add2", provides='z2'),
                                     Adder("add3", provides='z3'))

This example describes three tasks that will be executed in parallel. These tasks accept 'x' and 'y' parameters and return 'z1', 'z2' and 'z3'.


Graph Flow describes a graph. Graph dependencies can be resolved automatically by subflows' provided and required values. Independent subflows can be executed in parallel. Cyclic dependencies in the graph cause an error.

Example:

   flow = (graph_flow.Flow().add(Adder("add1", provides='z1'),
                                 Adder("add2", provides='z2', rebind=['z1', 'z3]),
                                 Adder("add3", provides='z3'))

In this example tasks 'add1' and 'add3' haven't any dependencies from another tasks, but 'task2' requires data that should be produced by previous two tasks. This means that 'add1' and 'add3' should be executed first even in parallel. But 'add2' depends on these tasks and will be executed after.


Building the engine

Engine executes a flow. Engine converts flow patterns to actions. The implementation of an action defines how to execute it. Engine accepts a dictionary that specifies what action should be built from each pattern type. The following example creates a single threaded engine where all tasks will be executed sequentially in a single thread.

   engine = SingleThreadedActionEngine(flow)
   engine.run()

Multithreaded engine can be used to run tasks in parallel. This engine uses multiprocessing.pool.ThreadPool by default, but any other thread pool can be set as a parameter.

   engine = MultiThreadedActionEngine(flow, thread_pool=my_pool)
   engine.run()