Jump to: navigation, search

Difference between revisions of "TaskFlow/Inputs and Outputs"

(Common notification classes)
(Common notification classes)
Line 177: Line 177:
 
* [http://github.com/stackforge/taskflow/blob/master/taskflow/listeners/printing.py Helper]  to output to stderr/stdout
 
* [http://github.com/stackforge/taskflow/blob/master/taskflow/listeners/printing.py Helper]  to output to stderr/stdout
 
* [http://github.com/stackforge/taskflow/blob/master/taskflow/listeners/logging.py Helper]  to output to a logging backend
 
* [http://github.com/stackforge/taskflow/blob/master/taskflow/listeners/logging.py Helper]  to output to a logging backend
 +
  
 
'''A basic example is the following:'''
 
'''A basic example is the following:'''
 
 
     >>> from taskflow import task
 
     >>> from taskflow import task
 
     >>> from taskflow import engines
 
     >>> from taskflow import engines
Line 202: Line 202:
 
     ...  eng.run()
 
     ...  eng.run()
 
     ...
 
     ...
     SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (2bde9287-bef2-4314-b193-7871610d08bc) into state 'RUNNING'
+
     SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...c) into state 'RUNNING'
     SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (e039053d-7263-4a97-bc36-27ac91d6cf39) into state 'RUNNING'
+
     SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'RUNNING'
 
     meow
 
     meow
     SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (e039053d-7263-4a97-bc36-27ac91d6cf39) into state 'SUCCESS' with result 'cat' (failure=False)
+
     SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False)
     SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (8be23ed2-6597-4005-afeb-eee22c890874) into state 'RUNNING'
+
     SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'RUNNING'
 
     woof
 
     woof
     SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (8be23ed2-6597-4005-afeb-eee22c890874) into state 'SUCCESS' with result 'dog' (failure=False)
+
     SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False)
     SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (2bde9287-bef2-4314-b193-7871610d08bc) into state 'SUCCESS'
+
     SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...c) into state 'SUCCESS'

Revision as of 06:12, 25 October 2013

Revised on: 10/25/2013 by Harlowja

Overview

In taskflow there are multiple ways to design how your tasks/flows and engines get inputs and produce outputs. This document will help you understand what those ways are and how to use those ways to accomplish your desired taskflow usage pattern as well as include examples that show common ways of providing input and getting output.

Task & Flow Inputs and Outputs

See: Task & Flow Arguments and Results

Engine Inputs and Outputs

Storage

The storage layer is how an engine persists flow and task details.

For more in-depth design details: persistence.

Inputs

The problem: how to prepopulate your engine with arguments (so that dependent/root tasks can immediately start running).

   >>> from taskflow import task
   >>> from taskflow import engines
   >>> from taskflow.patterns import linear_flow as lf
   >>> 
  >>> class CatTalk(task.Task):
   ...   def execute(self, meow):
   ...     print meow
   ...     return "cat"
   ... 
   >>> class DogTalk(task.Task):
   ...   def execute(self, woof):
   ...     print woof
   ...     return "dog"
   ... 
   >>> flo = lf.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   >>> engines.run(flo)
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File "/usr/lib/python2.6/site-packages/taskflow/engines/helpers.py", line 110, in run
       engine.run()
     File "/usr/lib/python2.6/site-packages/taskflow/utils/lock_utils.py", line 51, in wrapper
       return f(*args, **kwargs)
     File "/usr/lib/python2.6/site-packages/taskflow/engines/action_engine/engine.py", line 104, in run
       raise exc.MissingDependencies(self._flow, sorted(missing))
   taskflow.exceptions.MissingDependencies: taskflow.patterns.linear_flow.Flow: cat-dog; 
   2 requires ['meow', 'woof'] but no other entity produces said requirements

To solve this you would want to do the following to make your flow run smoothly:

   >>> from taskflow import task
   >>> from taskflow import engines
   >>> from taskflow.patterns import linear_flow as lf
   >>> 
   >>> class CatTalk(task.Task):
   ...   def execute(self, meow):
   ...     print meow
   ...     return "cat"
   ... 
   >>> class DogTalk(task.Task):
   ...   def execute(self, woof):
   ...     print woof
   ...     return "dog"
   ... 
   >>> flo = lf.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   >>> engines.run(flo, store={'meow': 'meow', 'woof': 'woof'})
   meow
   woof
   {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}

Note: you can also directly interact with the engine storage layer to add additional values although you must use the load method instead.

   >>> flo = lf.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   >>> eng = engines.load(flo, store={'meow': 'meow'})
   >>> eng.storage.inject({"woof": "bark"})
   >>> eng.run()
   meow
   bark
Outputs

Note: as you can see the result of the previous run method is the results of all tasks that have ran.

This same data can be fetched in a more precise manner by doing the following:

   >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
   >>> eng.run()
   meow
   woof
   >>> print(eng.storage.fetch_all())
   {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
   >>> print(eng.storage.fetch("dog"))
   dog

Notifications

What: engines provide a way to receive notification on task and flow state transitions.

Why: state transition notifications are useful for monitoring, logging, metrics, debugging, affecting further engine state (and other unknown future usage).

Flow notifications

A basic example is the following:

   >>> from taskflow import task
   >>> from taskflow import engines
   >>> from taskflow.patterns import linear_flow as lf
   >>> 
   >>> class CatTalk(task.Task):
   ...   def execute(self, meow):
   ...     print(meow)
   ...     return "cat"
   ... 
   >>> class DogTalk(task.Task):
   ...   def execute(self, woof):
   ...     print(woof)
   ...     return 'dog'
   ... 
   >>> def flow_transition(state, details):
   ...     print("Flow '%s' transition to state %s" % (details['flow_name'], state))
   ... 
   >>> 
   >>> flo = lf.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   <taskflow.patterns.linear_flow.Flow object at 0x2263050>
   >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
   >>> eng.notifier.register("*", flow_transition)
   >>> eng.run()
   Flow 'cat-dog' transition to state RUNNING
   meow
   woof
   Flow 'cat-dog' transition to state SUCCESS
Task notifications

A basic example is the following:

   >>> from taskflow import task
   >>> from taskflow import engines
   >>> from taskflow.patterns import linear_flow as lf
   >>> 
   >>> class CatTalk(task.Task):
   ...   def execute(self, meow):
   ...     print(meow)
   ...     return "cat"
   ... 
   >>> class DogTalk(task.Task):
   ...   def execute(self, woof):
   ...     print(woof)
   ...     return 'dog'
   ... 
   >>> def task_transition(state, details):
   ...     print("Task '%s' transition to state %s" % (details['task_name'], state))
   ... 
   >>> 
   >>> flo = lf.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   <taskflow.patterns.linear_flow.Flow object at 0x22634d0>
   >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
   >>> eng.task_notifier.register("*", task_transition)
   >>> eng.run()
   Task '__main__.CatTalk' transition to state RUNNING
   meow
   Task '__main__.CatTalk' transition to state SUCCESS
   Task '__main__.DogTalk' transition to state RUNNING
   woof
   Task '__main__.DogTalk' transition to state SUCCESS
Common notification classes

There exists common helper classes that can be used to accomplish common ways of notifying.

  • Helper to output to stderr/stdout
  • Helper to output to a logging backend


A basic example is the following:

   >>> from taskflow import task
   >>> from taskflow import engines
   >>> from taskflow.listeners import printing
   >>> from taskflow.patterns import linear_flow as lf
   >>>
   >>> class CatTalk(task.Task):
   ...   def execute(self, meow):
   ...     print(meow)
   ...     return "cat"
   ...
   >>> class DogTalk(task.Task):
   ...   def execute(self, woof):
   ...     print(woof)
   ...     return 'dog'
   ...
   >>>
   >>> flo = lf.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
   >>> with printing.PrintingListener(eng):
   ...   eng.run()
   ...
   SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...c) into state 'RUNNING'
   SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'RUNNING'
   meow
   SingleThreadedActionEngine: 49258576 has moved task '__main__.CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False)
   SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'RUNNING'
   woof
   SingleThreadedActionEngine: 49258576 has moved task '__main__.DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False)
   SingleThreadedActionEngine: 49258576 has moved flow 'cat-dog' (...c) into state 'SUCCESS'