Jump to: navigation, search

TaskFlow/Retry

< TaskFlow
Revision as of 20:09, 29 March 2014 by Harlowja (talk | contribs)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

Revised on: 3/29/2014 by Harlowja

Summary

Retry is an atom that handles a flow failure and can retry it with another parameters. Retry behavior is similar to the task behavior. Retry can be executed or reverted. But only one retry can be used in one flow pattern (subflow). If a task from this subflow fails, retry on_failure method is called and retry handles a subflow failure. It can perform three actions:

  • RETRY - retries a subflow again
  • REVERT - reverts only a current subflow, retry of the parent flow can handle this failure
  • REVERT_ALL - completely reverts a whole flow.


Usage

The following example shows how retry is used in a flow.

 flow = linear_flow.Flow('f1').add(
     task.Task('t1'),
     linear_flow.Flow('f2', retry=retry.ForEach(values=['a', 'b', 'c'], name='r1', provides='value')).add(
         task.Task('t2'),
         task.Task('t3', requires='value')),
     task.Task('t4'))

In this example the flow 'f2' has a retry controller 'r1', that is a default retry controller retry.ForEach, it accepts a collection of values and iterates over this collection. On each run ForEach retry returns the next value from the collection and stops to retry a subflow if there are no more values left in the collection. If tasks 't2' or 't3' fail, then the flow 'f2' will be reverted and retry 'r1' will retry it with the next value from the given collection ['a', 'b', 'c']. But if the task 't1' or the task 't4' fails, 'r1' won't retry a flow, because tasks 't1' and 't4' are in the flow 'f1' and don't depend on retry 'r1'.

If a flow with a retry has a nested flow as it shown in the next example,

 flow = linear_flow.Flow('f1', retry=retry.ForEach(values=['a', 'b', 'c'], name='r1', provides='value').add(
     task.Task('t1'),
     linear_flow.Flow('f2').add(
         task.Task('t2'),
         task.Task('t3', requires='value')),
     task.Task('t4'))

failure of any task triggers retry 'r1' because all tasks are nested to the flow 'f1' and depend on retry 'r1'.

Default retries

There are some default retry controllers that can be used in most cases.

  • AlwaysRevert - Reverts current subflow, parent retry can handle the error.
  • AlwaysRevertAll - Reverts the whole flow on subflow failure.
  • Times - Retries a flow a given number of times, returns an attempt number. This retry can be used if some operation should be retried several times with the same parameters. In the following example 'send_message' flow will try to execute SendMessageTask 5 times.
 flow = linear_flow.Flow('send_message', retry=retry.Times(5)).add(
     SendMessageTask('sender'))
  • ForEach - Accepts a collection of values to the constructor. Returns the next element of the collection on each try.

In the following example the flow tries to connect a server using a list of possible IPs. Each time the retry will return one ip from the list. In case of failure it will return the next one until it reaches the last one, then a flow will be reverted.

 server_ips = ['192.168.3.5', '192.168.3.6', '192.168.4.1' ]
 flow = linear_flow.Flow('send_message', retry=retry.ForEach(server_ips, provides='ip')).add(
     ConnectToServer(requires='ip'))
  • ParameterizedForEach - The same as ForEach, but it accepts a collection from the storage as a values parameter of execute method.

Let's implement previous example using ParameterizedForEach.

 server_ips = ['192.168.3.5', '192.168.3.6', '192.168.4.1' ]
 flow = linear_flow.Flow('send_message',
                         retry=retry.ParameterizedForEach(rebind={'values': 'server_ips'},
                                                          provides='ip')).add(
     ConnectToServer(requires='ip'))
 
 taskflow.engine.run(flow, store={'server_ips': server_ips})