Difference between revisions of "TaskFlow/Inputs and Outputs"
(→Common notification classes) |
(described Flow Inputs and Outputs) |
||
Line 7: | Line 7: | ||
=== Task & Flow Inputs and Outputs === | === Task & Flow Inputs and Outputs === | ||
− | ''' | + | Task accept inputs via ''task arguments'' and provide outputs via |
+ | ''task results'' (see [[TaskFlow/Task_Arguments_and_Results|Task | ||
+ | Arguments and Results]] for more details). This is standard and recommended | ||
+ | mean to pass data from one task to another. But not every task argument should | ||
+ | be provided by some other task of the flow, and not every task result | ||
+ | should be consumed by any task. | ||
+ | |||
+ | If some value is required by one or more tasks of the flow, but is not provided | ||
+ | by any task, it is considered to be '''flow input''', and '''MUST''' be put into the | ||
+ | storage before flow is run. Set of names requred by flow can be retrieved via | ||
+ | <code>requires</code> proprety of the flow. | ||
+ | |||
+ | All values provided by tasks of the flow are considered to be '''flow | ||
+ | outputs'''; set of names of such values is available as <code>provides</code> | ||
+ | property of the flow. | ||
+ | |||
+ | '''For example''': | ||
+ | |||
+ | >>> class MyTask(task.Task): | ||
+ | ... def execute(self, **kwargs): | ||
+ | ... return 1, 2 | ||
+ | ... | ||
+ | >>> flow = linear_flow.Flow('test').add( | ||
+ | ... MyTask(requires='a', provides=('b', 'c')), | ||
+ | ... MyTask(requires='b', provides='d') | ||
+ | ... ) | ||
+ | >>> flow.requires | ||
+ | set(['a']) | ||
+ | >>> flow.provides | ||
+ | set(['c', 'b', 'd']) | ||
+ | |||
+ | As you can see, this flow does not require <code>b</code>, as it is provided by | ||
+ | fist task. | ||
=== Engine Inputs and Outputs === | === Engine Inputs and Outputs === |
Revision as of 12:11, 25 October 2013
Revised on: 10/25/2013 by Ivan Melnikov
Contents
Overview
In taskflow there are multiple ways to design how your tasks/flows and engines get inputs and produce outputs. This document will help you understand what those ways are and how to use those ways to accomplish your desired taskflow usage pattern as well as include examples that show common ways of providing input and getting output.
Task & Flow Inputs and Outputs
Task accept inputs via task arguments and provide outputs via task results (see Task Arguments and Results for more details). This is standard and recommended mean to pass data from one task to another. But not every task argument should be provided by some other task of the flow, and not every task result should be consumed by any task.
If some value is required by one or more tasks of the flow, but is not provided
by any task, it is considered to be flow input, and MUST be put into the
storage before flow is run. Set of names requred by flow can be retrieved via
requires
proprety of the flow.
All values provided by tasks of the flow are considered to be flow
outputs; set of names of such values is available as provides
property of the flow.
For example:
>>> class MyTask(task.Task): ... def execute(self, **kwargs): ... return 1, 2 ... >>> flow = linear_flow.Flow('test').add( ... MyTask(requires='a', provides=('b', 'c')), ... MyTask(requires='b', provides='d') ... ) >>> flow.requires set(['a']) >>> flow.provides set(['c', 'b', 'd'])
As you can see, this flow does not require b
, as it is provided by
fist task.
Engine Inputs and Outputs
Storage
The storage layer is how an engine persists flow and task details.
For more in-depth design details: persistence.
Inputs
The problem: how to prepopulate your engine with arguments (so that dependent/root tasks can immediately start running).
>>> from taskflow import task >>> from taskflow import engines >>> from taskflow.patterns import linear_flow as lf >>> >>> class CatTalk(task.Task): ... def execute(self, meow): ... print meow ... return "cat" ... >>> class DogTalk(task.Task): ... def execute(self, woof): ... print woof ... return "dog" ... >>> flo = lf.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) >>> engines.run(flo) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python2.6/site-packages/taskflow/engines/helpers.py", line 110, in run engine.run() File "/usr/lib/python2.6/site-packages/taskflow/utils/lock_utils.py", line 51, in wrapper return f(*args, **kwargs) File "/usr/lib/python2.6/site-packages/taskflow/engines/action_engine/engine.py", line 104, in run raise exc.MissingDependencies(self._flow, sorted(missing)) taskflow.exceptions.MissingDependencies: taskflow.patterns.linear_flow.Flow: cat-dog; 2 requires ['meow', 'woof'] but no other entity produces said requirements
To solve this you would want to do the following to make your flow run smoothly:
>>> from taskflow import task >>> from taskflow import engines >>> from taskflow.patterns import linear_flow as lf >>> >>> class CatTalk(task.Task): ... def execute(self, meow): ... print meow ... return "cat" ... >>> class DogTalk(task.Task): ... def execute(self, woof): ... print woof ... return "dog" ... >>> flo = lf.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) >>> engines.run(flo, store={'meow': 'meow', 'woof': 'woof'}) meow woof {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
Note: you can also directly interact with the engine storage layer to add additional values although you must use the load
method instead.
>>> flo = lf.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) >>> eng = engines.load(flo, store={'meow': 'meow'}) >>> eng.storage.inject({"woof": "bark"}) >>> eng.run() meow bark
Outputs
Note: as you can see the result of the previous run
method is the results of all tasks that have ran.
This same data can be fetched in a more precise manner by doing the following:
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'}) >>> eng.run() meow woof >>> print(eng.storage.fetch_all()) {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'} >>> print(eng.storage.fetch("dog")) dog
Notifications
What: engines provide a way to receive notification on task and flow state transitions.
Why: state transition notifications are useful for monitoring, logging, metrics, debugging, affecting further engine state (and other unknown future usage).
Flow notifications
A basic example is the following:
>>> from taskflow import task >>> from taskflow import engines >>> from taskflow.patterns import linear_flow as lf >>> >>> class CatTalk(task.Task): ... def execute(self, meow): ... print(meow) ... return "cat" ... >>> class DogTalk(task.Task): ... def execute(self, woof): ... print(woof) ... return 'dog' ... >>> def flow_transition(state, details): ... print("Flow '%s' transition to state %s" % (details['flow_name'], state)) ... >>> >>> flo = lf.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) <taskflow.patterns.linear_flow.Flow object at 0x2263050> >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'}) >>> eng.notifier.register("*", flow_transition) >>> eng.run() Flow 'cat-dog' transition to state RUNNING meow woof Flow 'cat-dog' transition to state SUCCESS
Task notifications
A basic example is the following:
>>> from taskflow import task >>> from taskflow import engines >>> from taskflow.patterns import linear_flow as lf >>> >>> class CatTalk(task.Task): ... def execute(self, meow): ... print(meow) ... return "cat" ... >>> class DogTalk(task.Task): ... def execute(self, woof): ... print(woof) ... return 'dog' ... >>> def task_transition(state, details): ... print("Task '%s' transition to state %s" % (details['task_name'], state)) ... >>> >>> flo = lf.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) <taskflow.patterns.linear_flow.Flow object at 0x22634d0> >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'}) >>> eng.task_notifier.register("*", task_transition) >>> eng.run() Task '__main__.CatTalk' transition to state RUNNING meow Task '__main__.CatTalk' transition to state SUCCESS Task '__main__.DogTalk' transition to state RUNNING woof Task '__main__.DogTalk' transition to state SUCCESS
Common notification classes
There exists common helper classes that can be used to accomplish common ways of notifying.
A basic example is the following:
>>> from taskflow import task >>> from taskflow import engines >>> from taskflow.listeners import printing >>> from taskflow.patterns import linear_flow as lf >>> >>> class CatTalk(task.Task): ... def execute(self, meow): ... print(meow) ... return "cat" ... >>> class DogTalk(task.Task): ... def execute(self, woof): ... print(woof) ... return 'dog' ... >>> >>> flo = lf.Flow("cat-dog") >>> flo.add(CatTalk(), DogTalk(provides="dog")) >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'}) >>> with printing.PrintingListener(eng): ... eng.run() ... SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...c) into state 'RUNNING' SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'RUNNING' meow SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False) SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'RUNNING' woof SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False) SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...) into state 'SUCCESS'