Difference between revisions of "TaskFlow/Patterns and Engines Documentation"
Akarpinska (talk | contribs) |
Akarpinska (talk | contribs) |
||
(One intermediate revision by the same user not shown) | |||
Line 2: | Line 2: | ||
<br /> | <br /> | ||
<br /> | <br /> | ||
− | Patterns describe the Flow structure. Patterns define an order of execution of | + | Patterns describe the Flow structure. Patterns define an order of execution of subflows (patterns or tasks).<br /> |
<br /> | <br /> | ||
<br /> | <br /> | ||
− | '''Task''' is a | + | '''Task''' is a user-defined class that can be executed and reverted.<br /> |
<br /> | <br /> | ||
Example:<br /> | Example:<br /> | ||
class Adder(task.Task): | class Adder(task.Task): | ||
− | def __init__(self, name): | + | def __init__(self, name, rebind, provides, requires): |
− | super(Adder, self).__init__(name) | + | super(Adder, self).__init__(name, rebind=rebind, provides=provides, requires=requires) |
def __call__(self, x, y): | def __call__(self, x, y): | ||
return x+y | return x+y | ||
Line 16: | Line 16: | ||
print "Revert adder" | print "Revert adder" | ||
− | flow = | + | flow = Adder("add", provides='z') |
This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.<br /> | This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.<br /> | ||
<br /> | <br /> | ||
<br /> | <br /> | ||
− | ''' | + | '''Storage and Task Parameters'''<br /> |
+ | <br /> | ||
+ | Engine storage contains all data produced by flow. Also initial parameters for the tasks can be saved to the storage before the flow starts.<br /> | ||
+ | engine.storage.inject({'x': 5, 'y': 7}) | ||
+ | By default task receives parameters from storage. Task parameter name is the same as the storage key. But in case if we want to put parameters with other names to the task, the parameters binding should be described. Let an Adder task be an example.<br /> | ||
+ | flow = (linear_flow.Flow().add(Adder("add1", provides='z1'), | ||
+ | Adder("add2", provides='z2'), | ||
+ | Adder("add3", provides='z3', rebind=['z1', 'z2'])) | ||
+ | The 'add3' task requires 'x' and 'y' as parameters. But we want to use results from 'add1' and 'add2' instead. The following parameter tells what data should be used.<br /> | ||
+ | rebind=['z1', 'z2'] | ||
+ | If we want to replace some parameters, but not all, partial mapping can be used. In the following case 'z1' will be set as an 'y' parameter.<br /> | ||
+ | rebind={'y': 'z1'} | ||
+ | To save task result to storage result names should be specified with 'provides' parameter. It can be a string value in case if task produces only one result, or a list in a case of multiple results. | ||
+ | provides=['a', 'b'] | ||
+ | provides='x' | ||
+ | Additional requirements can be specified with 'requires' parameter. | ||
+ | requires=['a', 'b'] | ||
+ | <br /> | ||
+ | <br /> | ||
+ | '''Linear Flow''' pattern executes subflows sequentially, one after another.<br /> | ||
<br /> | <br /> | ||
Example:<br /> | Example:<br /> | ||
− | flow = ( | + | flow = (linear_flow.Flow().add(Adder("add1", provides='z1'), |
− | + | Adder("add2", provides='z2'), | |
− | + | Adder("add3", provides='z3', rebind=['z1', 'z2'])) | |
This example describes three tasks. First two adders will receive both 'x' and 'y' parameters and return 'z1' and 'z2'. The third task will receive 'z1' and 'z2' and return 'z3'.<br /> | This example describes three tasks. First two adders will receive both 'x' and 'y' parameters and return 'z1' and 'z2'. The third task will receive 'z1' and 'z2' and return 'z3'.<br /> | ||
<br /> | <br /> | ||
<br /> | <br /> | ||
− | ''' | + | '''Unordered Flow''' describes a set of independent subflows that can be executed in parallel.<br /> |
<br /> | <br /> | ||
Example:<br /> | Example:<br /> | ||
− | flow = ( | + | flow = (unordered_flow.Flow().add(Adder("add1", provides='z1'), |
− | + | Adder("add2", provides='z2'), | |
− | + | Adder("add3", provides='z3')) | |
− | This example describes three tasks that will be | + | This example describes three tasks that will be executed in parallel. These tasks accept 'x' and 'y' parameters and return 'z1', 'z2' and 'z3'.<br /> |
+ | <br /> | ||
+ | <br /> | ||
+ | '''Graph Flow''' describes a graph. Graph dependencies can be resolved automatically by subflows' provided and required values. Independent subflows can be executed in parallel. Cyclic dependencies in the graph cause an error.<br /> | ||
+ | <br /> | ||
+ | Example:<br /> | ||
+ | flow = (graph_flow.Flow().add(Adder("add1", provides='z1'), | ||
+ | Adder("add2", provides='z2', rebind=['z1', 'z3]), | ||
+ | Adder("add3", provides='z3')) | ||
+ | In this example tasks 'add1' and 'add3' haven't any dependencies from another tasks, but 'task2' requires data that should be produced by previous two tasks. This means that 'add1' and 'add3' should be executed first even in parallel. But 'add2' depends on these tasks and will be executed after.<br /> | ||
<br /> | <br /> | ||
<br /> | <br /> | ||
'''Building the engine'''<br /> | '''Building the engine'''<br /> | ||
<br /> | <br /> | ||
− | Engine executes a flow. Engine converts flow | + | Engine executes a flow. Engine converts flow patterns to actions. The implementation of an action defines how to execute it. Engine accepts a dictionary that specifies what action should be built from each pattern type. The following example creates a single threaded engine where all tasks will be executed sequentially in a single thread.<br /> |
− | <br /> | ||
engine = SingleThreadedActionEngine(flow) | engine = SingleThreadedActionEngine(flow) | ||
engine.run() | engine.run() | ||
− | + | Multithreaded engine can be used to run tasks in parallel. This engine uses multiprocessing.pool.ThreadPool by default, but any other thread pool can be set as a parameter.<br /> | |
− | Multithreaded engine can be used to run tasks in parallel. This engine uses multiprocessing.pool.ThreadPool by default, but any other thread pool can be set as a parameter. | ||
engine = MultiThreadedActionEngine(flow, thread_pool=my_pool) | engine = MultiThreadedActionEngine(flow, thread_pool=my_pool) | ||
engine.run() | engine.run() | ||
<br /> | <br /> | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− |
Latest revision as of 13:44, 19 September 2013
Patterns
Patterns describe the Flow structure. Patterns define an order of execution of subflows (patterns or tasks).
Task is a user-defined class that can be executed and reverted.
Example:
class Adder(task.Task): def __init__(self, name, rebind, provides, requires): super(Adder, self).__init__(name, rebind=rebind, provides=provides, requires=requires) def __call__(self, x, y): return x+y def revert(self, x, y): print "Revert adder"
flow = Adder("add", provides='z')
This example describes an adder that will receive 'x' and 'y' parameters and return 'z'.
Storage and Task Parameters
Engine storage contains all data produced by flow. Also initial parameters for the tasks can be saved to the storage before the flow starts.
engine.storage.inject({'x': 5, 'y': 7})
By default task receives parameters from storage. Task parameter name is the same as the storage key. But in case if we want to put parameters with other names to the task, the parameters binding should be described. Let an Adder task be an example.
flow = (linear_flow.Flow().add(Adder("add1", provides='z1'), Adder("add2", provides='z2'), Adder("add3", provides='z3', rebind=['z1', 'z2']))
The 'add3' task requires 'x' and 'y' as parameters. But we want to use results from 'add1' and 'add2' instead. The following parameter tells what data should be used.
rebind=['z1', 'z2']
If we want to replace some parameters, but not all, partial mapping can be used. In the following case 'z1' will be set as an 'y' parameter.
rebind={'y': 'z1'}
To save task result to storage result names should be specified with 'provides' parameter. It can be a string value in case if task produces only one result, or a list in a case of multiple results.
provides=['a', 'b'] provides='x'
Additional requirements can be specified with 'requires' parameter.
requires=['a', 'b']
Linear Flow pattern executes subflows sequentially, one after another.
Example:
flow = (linear_flow.Flow().add(Adder("add1", provides='z1'), Adder("add2", provides='z2'), Adder("add3", provides='z3', rebind=['z1', 'z2']))
This example describes three tasks. First two adders will receive both 'x' and 'y' parameters and return 'z1' and 'z2'. The third task will receive 'z1' and 'z2' and return 'z3'.
Unordered Flow describes a set of independent subflows that can be executed in parallel.
Example:
flow = (unordered_flow.Flow().add(Adder("add1", provides='z1'), Adder("add2", provides='z2'), Adder("add3", provides='z3'))
This example describes three tasks that will be executed in parallel. These tasks accept 'x' and 'y' parameters and return 'z1', 'z2' and 'z3'.
Graph Flow describes a graph. Graph dependencies can be resolved automatically by subflows' provided and required values. Independent subflows can be executed in parallel. Cyclic dependencies in the graph cause an error.
Example:
flow = (graph_flow.Flow().add(Adder("add1", provides='z1'), Adder("add2", provides='z2', rebind=['z1', 'z3]), Adder("add3", provides='z3'))
In this example tasks 'add1' and 'add3' haven't any dependencies from another tasks, but 'task2' requires data that should be produced by previous two tasks. This means that 'add1' and 'add3' should be executed first even in parallel. But 'add2' depends on these tasks and will be executed after.
Building the engine
Engine executes a flow. Engine converts flow patterns to actions. The implementation of an action defines how to execute it. Engine accepts a dictionary that specifies what action should be built from each pattern type. The following example creates a single threaded engine where all tasks will be executed sequentially in a single thread.
engine = SingleThreadedActionEngine(flow) engine.run()
Multithreaded engine can be used to run tasks in parallel. This engine uses multiprocessing.pool.ThreadPool by default, but any other thread pool can be set as a parameter.
engine = MultiThreadedActionEngine(flow, thread_pool=my_pool) engine.run()