Jump to: navigation, search

Difference between revisions of "TaskFlow/Inputs and Outputs"

(Task & Flow Inputs and Outputs)
(Replaced content with "'''Revised on:''' {{REVISIONMONTH1}}/{{REVISIONDAY}}/{{REVISIONYEAR}} by {{REVISIONUSER}} The page was moved to developers documentation: http://docs.openstack.org/develo...")
 
(2 intermediate revisions by 2 users not shown)
Line 1: Line 1:
 
'''Revised on:''' {{REVISIONMONTH1}}/{{REVISIONDAY}}/{{REVISIONYEAR}} by {{REVISIONUSER}}
 
'''Revised on:''' {{REVISIONMONTH1}}/{{REVISIONDAY}}/{{REVISIONYEAR}} by {{REVISIONUSER}}
  
== Overview ==
+
The page was moved to developers documentation: http://docs.openstack.org/developer/taskflow/inputs_and_outputs.html
 
 
In taskflow there are multiple ways to design how your tasks/flows and engines get inputs  and produce outputs. This document will help you understand what those ways are and how to use those ways to accomplish your desired taskflow usage pattern as well as include examples that show common ways of providing input and getting output.
 
 
 
=== Task & Flow Inputs and Outputs ===
 
 
 
A task accept inputs via ''task arguments'' and provides outputs via
 
''task results'' (see [[TaskFlow/Task_Arguments_and_Results|task
 
arguments and results]] for more details). This the standard and recommended way
 
to pass data from one task to another. Of course not every task argument needs to
 
be provided to some other task of a flow, and not every task result
 
should be consumed by every task.
 
 
 
If some value is required by one or more tasks of a flow, but is not provided
 
by any task, it is considered to be '''flow input''', and '''MUST''' be put into the
 
[[TaskFlow/Persistence#Storage|storage]] before the flow is run. A set of names
 
required by a flow can be retrieved via that flows <code>requires</code> property.
 
 
 
All values provided by tasks of the flow are considered to be '''flow outputs''';
 
the set of names of such values is available via the <code>provides</code>
 
property of the flow.
 
 
 
'''For example''':
 
 
 
    >>> class MyTask(task.Task):
 
    ...    def execute(self, **kwargs):
 
    ...        return 1, 2
 
    ...
 
    >>> flow = linear_flow.Flow('test').add(
 
    ...    MyTask(requires='a', provides=('b', 'c')),
 
    ...    MyTask(requires='b', provides='d')
 
    ... )
 
    >>> flow.requires
 
    set(['a'])
 
    >>> flow.provides
 
    set(['c', 'b', 'd'])
 
 
 
As you can see, this flow does not require <code>b</code>, as it is provided by the
 
fist task.
 
 
 
=== Engine Inputs and Outputs ===
 
 
 
==== Storage ====
 
 
 
The storage layer is how an engine persists flow and task details.
 
 
 
For more in-depth design details: [[TaskFlow/Persistence|persistence]].
 
 
 
===== Inputs =====
 
 
 
'''The problem:''' you should prepopulate storage with all required flow inputs before running it:
 
 
 
    >>> from taskflow import task
 
    >>> from taskflow import engines
 
    >>> from taskflow.patterns import linear_flow as lf
 
    >>>
 
    >>> class CatTalk(task.Task):
 
    ...  def execute(self, meow):
 
    ...    print meow
 
    ...    return "cat"
 
    ...
 
    >>> class DogTalk(task.Task):
 
    ...  def execute(self, woof):
 
    ...    print woof
 
    ...    return "dog"
 
    ...
 
    >>> flo = lf.Flow("cat-dog")
 
    >>> flo.add(CatTalk(), DogTalk(provides="dog"))
 
    >>> engines.run(flo)
 
    Traceback (most recent call last):
 
      File "<stdin>", line 1, in <module>
 
      File "/usr/lib/python2.6/site-packages/taskflow/engines/helpers.py", line 110, in run
 
        engine.run()
 
      File "/usr/lib/python2.6/site-packages/taskflow/utils/lock_utils.py", line 51, in wrapper
 
        return f(*args, **kwargs)
 
      File "/usr/lib/python2.6/site-packages/taskflow/engines/action_engine/engine.py", line 104, in run
 
        raise exc.MissingDependencies(self._flow, sorted(missing))
 
    taskflow.exceptions.MissingDependencies: taskflow.patterns.linear_flow.Flow: cat-dog;
 
    2 requires ['meow', 'woof'] but no other entity produces said requirements
 
 
 
'''The solution''': provide necessary data via <code>store</code> parameter of <code>engines.run</code>:
 
 
 
    >>> from taskflow import task
 
    >>> from taskflow import engines
 
    >>> from taskflow.patterns import linear_flow as lf
 
    >>>
 
    >>> class CatTalk(task.Task):
 
    ...  def execute(self, meow):
 
    ...    print meow
 
    ...    return "cat"
 
    ...
 
    >>> class DogTalk(task.Task):
 
    ...  def execute(self, woof):
 
    ...    print woof
 
    ...    return "dog"
 
    ...
 
    >>> flo = lf.Flow("cat-dog")
 
    >>> flo.add(CatTalk(), DogTalk(provides="dog"))
 
    >>> engines.run(flo, store={'meow': 'meow', 'woof': 'woof'})
 
    meow
 
    woof
 
    {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
 
 
 
'''Note:''' you can also directly interact with the engine storage layer to add additional values although you must use the <code>load</code> method instead.
 
 
 
    >>> flo = lf.Flow("cat-dog")
 
    >>> flo.add(CatTalk(), DogTalk(provides="dog"))
 
    >>> eng = engines.load(flo, store={'meow': 'meow'})
 
    >>> eng.storage.inject({"woof": "bark"})
 
    >>> eng.run()
 
    meow
 
    bark
 
 
 
===== Outputs =====
 
 
 
As you can see from examples above, <code>run</code> method returns all flow outputs
 
in a <code>dict</code>. This same data can be fetched via <code>fetch_all</code> method
 
of the storage, or in a more precise manner by using <code>fetch</code> method.
 
 
 
'''For example''':
 
 
 
    >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
 
    >>> eng.run()
 
    meow
 
    woof
 
    >>> print(eng.storage.fetch_all())
 
    {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
 
    >>> print(eng.storage.fetch("dog"))
 
    dog
 
 
 
==== Notifications ====
 
 
 
'''What:''' engines provide a way to receive notification on task and flow state transitions.
 
 
 
'''Why:''' state transition notifications are useful for monitoring, logging, metrics, debugging, affecting further engine state (and other unknown ''future'' usage).
 
 
 
===== Flow notifications =====
 
 
 
A basic example is the following:
 
 
 
    >>> from taskflow import task
 
    >>> from taskflow import engines
 
    >>> from taskflow.patterns import linear_flow as lf
 
    >>>
 
    >>> class CatTalk(task.Task):
 
    ...  def execute(self, meow):
 
    ...    print(meow)
 
    ...    return "cat"
 
    ...
 
    >>> class DogTalk(task.Task):
 
    ...  def execute(self, woof):
 
    ...    print(woof)
 
    ...    return 'dog'
 
    ...
 
    >>> def flow_transition(state, details):
 
    ...    print("Flow '%s' transition to state %s" % (details['flow_name'], state))
 
    ...
 
    >>>
 
    >>> flo = lf.Flow("cat-dog")
 
    >>> flo.add(CatTalk(), DogTalk(provides="dog"))
 
    <taskflow.patterns.linear_flow.Flow object at 0x2263050>
 
    >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
 
    >>> eng.notifier.register("*", flow_transition)
 
    >>> eng.run()
 
    Flow 'cat-dog' transition to state RUNNING
 
    meow
 
    woof
 
    Flow 'cat-dog' transition to state SUCCESS
 
 
 
===== Task notifications =====
 
 
 
A basic example is the following:
 
 
 
    >>> from taskflow import task
 
    >>> from taskflow import engines
 
    >>> from taskflow.patterns import linear_flow as lf
 
    >>>
 
    >>> class CatTalk(task.Task):
 
    ...  def execute(self, meow):
 
    ...    print(meow)
 
    ...    return "cat"
 
    ...
 
    >>> class DogTalk(task.Task):
 
    ...  def execute(self, woof):
 
    ...    print(woof)
 
    ...    return 'dog'
 
    ...
 
    >>> def task_transition(state, details):
 
    ...    print("Task '%s' transition to state %s" % (details['task_name'], state))
 
    ...
 
    >>>
 
    >>> flo = lf.Flow("cat-dog")
 
    >>> flo.add(CatTalk(), DogTalk(provides="dog"))
 
    <taskflow.patterns.linear_flow.Flow object at 0x22634d0>
 
    >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
 
    >>> eng.task_notifier.register("*", task_transition)
 
    >>> eng.run()
 
    Task '__main__.CatTalk' transition to state RUNNING
 
    meow
 
    Task '__main__.CatTalk' transition to state SUCCESS
 
    Task '__main__.DogTalk' transition to state RUNNING
 
    woof
 
    Task '__main__.DogTalk' transition to state SUCCESS
 
 
 
===== Common notification classes =====
 
 
 
There exists common helper classes that can be used to accomplish common ways of notifying.
 
 
 
* [http://github.com/stackforge/taskflow/blob/master/taskflow/listeners/printing.py Helper]  to output to stderr/stdout
 
* [http://github.com/stackforge/taskflow/blob/master/taskflow/listeners/logging.py Helper]  to output to a logging backend
 
 
 
 
 
'''A basic example is the following:'''
 
    >>> from taskflow import task
 
    >>> from taskflow import engines
 
    >>> from taskflow.listeners import printing
 
    >>> from taskflow.patterns import linear_flow as lf
 
    >>>
 
    >>> class CatTalk(task.Task):
 
    ...  def execute(self, meow):
 
    ...    print(meow)
 
    ...    return "cat"
 
    ...
 
    >>> class DogTalk(task.Task):
 
    ...  def execute(self, woof):
 
    ...    print(woof)
 
    ...    return 'dog'
 
    ...
 
    >>>
 
    >>> flo = lf.Flow("cat-dog")
 
    >>> flo.add(CatTalk(), DogTalk(provides="dog"))
 
    >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
 
    >>> with printing.PrintingListener(eng):
 
    ...  eng.run()
 
    ...
 
    SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...c) into state 'RUNNING'
 
    SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'RUNNING'
 
    meow
 
    SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False)
 
    SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'RUNNING'
 
    woof
 
    SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False)
 
    SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...) into state 'SUCCESS'
 

Latest revision as of 10:10, 28 March 2014

Revised on: 3/28/2014 by Akarpinska

The page was moved to developers documentation: http://docs.openstack.org/developer/taskflow/inputs_and_outputs.html