Jump to: navigation, search

Difference between revisions of "TaskFlow/Patterns and Engines Documentation"

 
Line 2: Line 2:
 
<br />
 
<br />
 
<br />
 
<br />
Patterns describe the Flow structure. Patterns define an order of execution of subblocks (patterns or task blocks). TaskBlocks describe  is a block that describes a Task. Task is a user-defined class that can be executed and reverted.<br />
+
Patterns describe the Flow structure. Patterns define an order of execution of subflows (patterns or tasks).<br />
 
<br />
 
<br />
 
<br />
 
<br />
'''Task''' is a block that specifies how to execute a used-defined task, what parameters should be used and what data should be saved.<br />
+
'''Task''' is a user-defined class that can be executed and reverted.<br />
 
<br />
 
<br />
 
Example:<br />
 
Example:<br />
 
     class Adder(task.Task):
 
     class Adder(task.Task):
         def __init__(self, name):
+
         def __init__(self, name, rebind, provides, requires):
             super(Adder, self).__init__(name)
+
             super(Adder, self).__init__(name, rebind=rebind, provides=provides, requires=requires)
 
         def __call__(self, x, y):
 
         def __call__(self, x, y):
 
             return x+y
 
             return x+y
Line 16: Line 16:
 
             print "Revert adder"
 
             print "Revert adder"
  
         flow = blocks.Task(Adder("add"), save_as='z')
+
         flow = Adder("add", provides='z')
  
 
This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.<br />
 
This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.<br />
 
<br />
 
<br />
 
<br />
 
<br />
'''LinearFlow''' pattern runs subblocks sequentially, one after another.<br />
+
'''Storage and Task Parameters'''<br />
 +
<br />
 +
Engine storage contains all data produced by flow. Also initial parameters for the tasks can be saved to the storage before the flow starts.<br />
 +
    engine.storage.inject({'x': 5, 'y': 7})
 +
By default task receives parameters from storage. Task parameter name is the same as the storage key. But in case if we want to put parameters with other names to the task, the parameters binding should be described. Let an Adder task be an example.<br />
 +
    flow = (linear_flow.Flow().add(Adder("add1", provides='z1'),
 +
                                  Adder("add2", provides='z2'),
 +
                                  Adder("add3", provides='z3', rebind=['z1', 'z2']))
 +
The 'add3' task requires 'x' and 'y' as parameters. But we want to use results from 'add1' and 'add2' instead. The following parameter tells what data should be used.<br />
 +
    rebind=['z1', 'z2']
 +
If we want to replace some parameters, but not all, partial mapping can be used. In the following case 'z1' will be set as an 'y' parameter.<br />
 +
    rebind={'y': 'z1'}
 +
To save task result to storage result names should be specified with 'provides' parameter. It can be a string value in case if task produces only one result, or a list in a case of multiple results.
 +
  provides=['a', 'b']
 +
  provides='x'
 +
Additional requirements can be specified with 'requires' parameter.
 +
  requires=['a', 'b']
 +
<br />
 +
<br />
 +
'''Linear Flow''' pattern executes subflows sequentially, one after another.<br />
 
<br />
 
<br />
 
Example:<br />
 
Example:<br />
     flow = (blocks.LinearFlow().add(blocks.Task(Adder("add"), save_as='z1'),
+
     flow = (linear_flow.Flow().add(Adder("add1", provides='z1'),
                                    blocks.Task(Adder("add"), save_as='z2'),
+
                                  Adder("add2", provides='z2'),
                                    blocks.Task(Adder("add"), save_as='z3', rebind_args=['z1', 'z2']))
+
                                  Adder("add3", provides='z3', rebind=['z1', 'z2']))
  
 
This example describes three tasks. First two adders will receive both 'x' and 'y'  parameters and return 'z1' and 'z2'. The third task will receive  'z1' and 'z2' and return 'z3'.<br />
 
This example describes three tasks. First two adders will receive both 'x' and 'y'  parameters and return 'z1' and 'z2'. The third task will receive  'z1' and 'z2' and return 'z3'.<br />
 
<br />
 
<br />
 
<br />
 
<br />
'''ParallelFlow''' runs subblocks in parallel and waits for completion.<br />
+
'''Unordered Flow''' describes a set of independent subflows that can be executed in parallel.<br />
 
<br />
 
<br />
 
Example:<br />
 
Example:<br />
     flow = (blocks.ParallelFlow().add(blocks.Task(Adder("add"), save_as='z1'),
+
     flow = (unordered_flow.Flow().add(Adder("add1", provides='z1'),
                                       blocks.Task(Adder("add"), save_as='z2'),
+
                                       Adder("add2", provides='z2'),
                                       blocks.Task(Adder("add"), save_as='z3'))
+
                                       Adder("add3", provides='z3'))
This example describes three tasks that will be run in parallel. These tasks accept 'x' and 'y'  parameters and return 'z1', 'z2' and 'z3'.<br />
+
This example describes three tasks that will be executed in parallel. These tasks accept 'x' and 'y'  parameters and return 'z1', 'z2' and 'z3'.<br />
 +
<br />
 +
<br />
 +
'''Graph Flow''' describes a graph. Graph dependencies can be resolved automatically by subflows' provided and required values. Independent subflows can be executed in parallel. Cyclic dependencies in the graph cause an error.<br />
 +
<br />
 +
Example:<br />
 +
    flow = (graph_flow.Flow().add(Adder("add1", provides='z1'),
 +
                                  Adder("add2", provides='z2', rebind=['z1', 'z3]),
 +
                                  Adder("add3", provides='z3'))
 +
In this example tasks 'add1' and 'add3' haven't any dependencies from another tasks, but 'task2' requires data that should be produced by previous two tasks. This means that 'add1' and 'add3' should be executed first even in parallel. But 'add2' depends on these tasks and will be executed after.<br />
 
<br />
 
<br />
 
<br />
 
<br />
 
'''Building the engine'''<br />
 
'''Building the engine'''<br />
 
<br />
 
<br />
Engine executes a flow. Engine converts flow blocks to actions. The implementation of an action defines how to execute it. Engine accepts a dictionary that specifies what action should be built from each block type. The following example creates a single threaded engine where all tasks will be executed sequentially in a single thread.<br />
+
Engine executes a flow. Engine converts flow patterns to actions. The implementation of an action defines how to execute it. Engine accepts a dictionary that specifies what action should be built from each pattern type. The following example creates a single threaded engine where all tasks will be executed sequentially in a single thread.<br />
 
     engine = SingleThreadedActionEngine(flow)
 
     engine = SingleThreadedActionEngine(flow)
 
     engine.run()
 
     engine.run()
Line 49: Line 77:
 
     engine.run()
 
     engine.run()
 
<br />
 
<br />
<br />
 
'''Parameters'''<br />
 
<br />
 
Engine storage contains all data produced by flow. Also initial parameters for the tasks can be saved to the storage before the flow starts.<br />
 
    engine.storage = {'x': 5, 'y': 7}
 
By default task receives parameters from storage. Task parameter name is the same as the storage key. But in case if we want to put parameters with other names to the task, the parameters binding should be described. Let an Adder task be an example.<br />
 
    flow = (blocks.LinearFlow().add(blocks.Task(Adder("add1"), save_as='z1'),
 
                                    blocks.Task(Adder("add2"), save_as='z2'),
 
                                    blocks.Task(Adder("add3"), save_as='z3', rebind_args=['z1', 'z2']))
 
The 'add3' task requires 'x' and 'y' as parameters. But we want to use results from 'add1' and 'add2' instead. The following parameter tells what data should be used.<br />
 
    rebind_args=['z1', 'z2']
 
If we want to replace some parameters, but not all, partial mapping can be used. In the following case 'z1' will be set as an 'y' parameter.<br />
 
    rebind_args={'y': 'z1'}
 

Latest revision as of 13:44, 19 September 2013

Patterns

Patterns describe the Flow structure. Patterns define an order of execution of subflows (patterns or tasks).


Task is a user-defined class that can be executed and reverted.

Example:

   class Adder(task.Task):
       def __init__(self, name, rebind, provides, requires):
           super(Adder, self).__init__(name, rebind=rebind, provides=provides, requires=requires)
       def __call__(self, x, y):
           return x+y
       def revert(self, x, y):
           print "Revert adder"
       flow = Adder("add", provides='z')

This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.


Storage and Task Parameters

Engine storage contains all data produced by flow. Also initial parameters for the tasks can be saved to the storage before the flow starts.

   engine.storage.inject({'x': 5, 'y': 7})

By default task receives parameters from storage. Task parameter name is the same as the storage key. But in case if we want to put parameters with other names to the task, the parameters binding should be described. Let an Adder task be an example.

   flow = (linear_flow.Flow().add(Adder("add1", provides='z1'),
                                  Adder("add2", provides='z2'),
                                  Adder("add3", provides='z3', rebind=['z1', 'z2']))

The 'add3' task requires 'x' and 'y' as parameters. But we want to use results from 'add1' and 'add2' instead. The following parameter tells what data should be used.

   rebind=['z1', 'z2']

If we want to replace some parameters, but not all, partial mapping can be used. In the following case 'z1' will be set as an 'y' parameter.

   rebind={'y': 'z1'}

To save task result to storage result names should be specified with 'provides' parameter. It can be a string value in case if task produces only one result, or a list in a case of multiple results.

  provides=['a', 'b']
  provides='x'

Additional requirements can be specified with 'requires' parameter.

  requires=['a', 'b']



Linear Flow pattern executes subflows sequentially, one after another.

Example:

   flow = (linear_flow.Flow().add(Adder("add1", provides='z1'),
                                  Adder("add2", provides='z2'),
                                  Adder("add3", provides='z3', rebind=['z1', 'z2']))

This example describes three tasks. First two adders will receive both 'x' and 'y' parameters and return 'z1' and 'z2'. The third task will receive 'z1' and 'z2' and return 'z3'.


Unordered Flow describes a set of independent subflows that can be executed in parallel.

Example:

   flow = (unordered_flow.Flow().add(Adder("add1", provides='z1'),
                                     Adder("add2", provides='z2'),
                                     Adder("add3", provides='z3'))

This example describes three tasks that will be executed in parallel. These tasks accept 'x' and 'y' parameters and return 'z1', 'z2' and 'z3'.


Graph Flow describes a graph. Graph dependencies can be resolved automatically by subflows' provided and required values. Independent subflows can be executed in parallel. Cyclic dependencies in the graph cause an error.

Example:

   flow = (graph_flow.Flow().add(Adder("add1", provides='z1'),
                                 Adder("add2", provides='z2', rebind=['z1', 'z3]),
                                 Adder("add3", provides='z3'))

In this example tasks 'add1' and 'add3' haven't any dependencies from another tasks, but 'task2' requires data that should be produced by previous two tasks. This means that 'add1' and 'add3' should be executed first even in parallel. But 'add2' depends on these tasks and will be executed after.


Building the engine

Engine executes a flow. Engine converts flow patterns to actions. The implementation of an action defines how to execute it. Engine accepts a dictionary that specifies what action should be built from each pattern type. The following example creates a single threaded engine where all tasks will be executed sequentially in a single thread.

   engine = SingleThreadedActionEngine(flow)
   engine.run()

Multithreaded engine can be used to run tasks in parallel. This engine uses multiprocessing.pool.ThreadPool by default, but any other thread pool can be set as a parameter.

   engine = MultiThreadedActionEngine(flow, thread_pool=my_pool)
   engine.run()