Jump to: navigation, search

Difference between revisions of "TaskFlow/Worker-based Engine Protocol"

(Adjusted header levels)
Line 1: Line 1:
  
== Worker-based flow ==
+
== Worker-based engine ==
Worker-based flow uses remote workers to perform high load, computational and time-consuming tasks. There are two communication components - <code>Proxy</code> and <code>Worker</code>.
+
<code>Worker-based engine</code> uses remote workers to perform high load, computational and time-consuming tasks. There are two communication sides - <code>Executor</code> and <code>Worker</code> that communicate using <code>Proxy</code> component. <code>Proxy</code> is designed to accept/publish messages from/into named exchange.
  
=== Proxy ===
+
=== Executor ===
<code>Proxy</code> is a part of the worker-based engine and is used to publish task requests, so these requests can be accepted and processed by remote <code>Workers</code>.
+
<code>Executor</code> is a part of the worker-based engine and is used to publish task requests, so these requests can be accepted and processed by remote <code>Workers</code>.
  
 
=== Worker ===
 
=== Worker ===
<code>Worker</code> is started on remote host and has list of tasks it can perform. <code>Worker</code> accepts and processes task requests that are published by <code>Proxy</code>. Several requests can be processed simultaneously in separate threads (executor can be passed to the <code>Worker</code> and configured). Workers that can process same set of tasks can be grouped under common <code>topic</code>.
+
<code>Worker</code> is started on remote host and has list of tasks it can perform. <code>Worker</code> accepts and processes task requests that are published by <code>Executor</code>. Several requests can be processed simultaneously in separate threads (executor can be passed to the <code>Worker</code> and configured). Workers that can process same set of tasks can be grouped under common <code>topic</code>.
  
 
== Protocol ==
 
== Protocol ==
<code>Proxy</code> and <code>Workers</code> communicate in one common named exchange. <code>Proxy</code> publishes task requests to the named exchange and <code>Workers</code> take requests from it. When <code>Worker</code> gets task request it parses all parameters, dispatches endpoint and starts task processing. During task processing <code>Worker</code> sends responses back to <code>Proxy</code>.
+
<code>Executor</code> and <code>Workers</code> communicate in one common named exchange. <code>Executor</code> publishes task requests to the named exchange and <code>Workers</code> take requests from it. When <code>Worker</code> gets task request it parses all parameters, dispatches endpoint and starts task processing. During task processing <code>Worker</code> sends responses back to <code>Executor</code>.
  
==== Proxy and Worker communication ====
+
==== Executor and Worker communication ====
Let's consider how communication between <code>Proxy</code> and <code>Worker</code> happens.
+
Let's consider how communication between <code>Executor</code> and <code>Worker</code> happens.
First of all engine resolves all tasks dependencies and schedules tasks that can be performed at the moment. Tasks are executed by worker-based engine with WorkerTaskExecutor. WorkerTaskExecutor initiates task execution/reversion using <code>Proxy</code>.
+
First of all engine resolves all tasks dependencies and schedules tasks that can be performed at the moment. Tasks are executed by worker-based engine <code>Executor</code>. <code>Executor</code> initiates task execution/reversion using <code>Proxy</code>.
* <code>Proxy</code> publishes task request (format is described below) into named exchange with <code>routing key</code>, that is used to deliver request to particular workers <code>topic</code> and waits for task requests to be accepted and confirmed by <code>Workers</code>. If <code>Proxy</code> doesn't get task confirmation from <code>Workers</code> within the given timeout task is considered as timed-out and the <code>Timeout</code> exception is raised.
+
* <code>Executor</code> publishes task request (format is described below) into named exchange with <code>routing key</code>, that is used to deliver request to particular workers <code>topic</code> and waits for task requests to be accepted and confirmed by <code>Workers</code>. If <code>Executor</code> doesn't get task confirmation from <code>Workers</code> within the given timeout task is considered as timed-out and the <code>Timeout</code> exception is raised.
 
* <code>Worker</code> receives a request message it starts a new thread for processing it.
 
* <code>Worker</code> receives a request message it starts a new thread for processing it.
 
* <code>Worker</code> dispatches request (gets desired endpoint that actually executes task):<br />
 
* <code>Worker</code> dispatches request (gets desired endpoint that actually executes task):<br />
::* If request dispatching <code>succeeded</code> Worker sends confirmation response to the Proxy:<br />
+
::* If request dispatching <code>succeeded</code> Worker sends confirmation response to the Executor:<br />
 
::: <code>{'state': 'RUNNING'}</code>
 
::: <code>{'state': 'RUNNING'}</code>
::* If request dispatching <code>failed</code> Worker sends failure response to the <code>Proxy</code> with <code>misc.Failure</code> object:
+
::* If request dispatching <code>failed</code> Worker sends failure response to the <code>Executor</code> with <code>misc.Failure</code> object:
 
:::<code>{'state': 'FAILURE', 'result': <misc.Failure>}</code>
 
:::<code>{'state': 'FAILURE', 'result': <misc.Failure>}</code>
* <code>Proxy</code> gets task request confirmation from the <code>Worker</code> and task request state changed from the <code>Pending</code> to the <code>Running</code> state. Once task request is in the <code>Running</code> state it can't be timed-out (considering that task execution process may take unpredictable time).
+
* <code>Executor</code> gets task request confirmation from the <code>Worker</code> and task request state changed from the <code>Pending</code> to the <code>Running</code> state. Once task request is in the <code>Running</code> state it can't be timed-out (considering that task execution process may take unpredictable time).
* <code>Worker</code> executes task and once it is finished sends result back to the <code>Proxy</code>:
+
* <code>Worker</code> executes task and once it is finished sends result back to the <code>Executor</code>:
 
::* If result is of the <code>misc.Failure</code> type response with the failure state is sent back:
 
::* If result is of the <code>misc.Failure</code> type response with the failure state is sent back:
 
::: <code>{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}</code>
 
::: <code>{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}</code>
 
::* If result is not of the <code>misc.Failure</code> type response with the success state is sent back:
 
::* If result is not of the <code>misc.Failure</code> type response with the success state is sent back:
 
::: <code>{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}</code>
 
::: <code>{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}</code>
* <code>Proxy</code> gets task execution result from <code>Worker</code> and passes it back to the WorkerTaskExecutor and worker-based engine to finish task processing.
+
* <code>Executor</code> gets task execution result from <code>Worker</code> and passes it back to the WorkerTaskExecutor and worker-based engine to finish task processing.
* <code>Worker</code> is subscribed on the task progress and every time task progress event is triggered it sends progress notification to the <code>Proxy</code> where it is handled by engine:
+
* <code>Worker</code> is subscribed on the task progress and every time task progress event is triggered it sends progress notification to the <code>Executor</code> where it is handled by engine:
 
::: <code>{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}</code>
 
::: <code>{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}</code>
  
 
==== Notes ====
 
==== Notes ====
* <code>misc.Failure</code> objects are not json-serializable, so they are converted to dict before sending and converted from dict after receiving on both <code>Proxy</code>/<code>Worker</code> sides.
+
* <code>misc.Failure</code> objects are not json-serializable, so they are converted to dict before sending and converted from dict after receiving on both <code>Executor</code>/<code>Worker</code> sides.
* When <code>Worker</code> sends response to the <code>Proxy</code> it publishes to the exchange that is taken from the <code>reply_to</code> parameter of the AMQP message to reply to the correct <code>Proxy</code>.
+
* When <code>Worker</code> sends response to the <code>Executor</code> it publishes to the exchange that is taken from the <code>reply_to</code> parameter of the AMQP message to reply to the correct <code>Executor</code>.
  
==== Proxy request format ====
+
==== Executor request format ====
 
* '''task''' - full task name to be performed
 
* '''task''' - full task name to be performed
 
* '''action''' - task action to be performed (e.g. execute, revert)
 
* '''action''' - task action to be performed (e.g. execute, revert)
Line 47: Line 47:
  
 
Additionally, the following parameters are added to the AMQP request message:
 
Additionally, the following parameters are added to the AMQP request message:
* '''reply_to''' - Proxy named exchange workers will send responses back to
+
* '''reply_to''' - Executor named exchange workers will send responses back to
* '''correlation_id''' - Proxy request id (since there can be multiple request being processed simultaneously)
+
* '''correlation_id''' - Executor request id (since there can be multiple request being processed simultaneously)
  
 
==== Worker response format ====
 
==== Worker response format ====

Revision as of 15:36, 22 January 2014

Worker-based engine

Worker-based engine uses remote workers to perform high load, computational and time-consuming tasks. There are two communication sides - Executor and Worker that communicate using Proxy component. Proxy is designed to accept/publish messages from/into named exchange.

Executor

Executor is a part of the worker-based engine and is used to publish task requests, so these requests can be accepted and processed by remote Workers.

Worker

Worker is started on remote host and has list of tasks it can perform. Worker accepts and processes task requests that are published by Executor. Several requests can be processed simultaneously in separate threads (executor can be passed to the Worker and configured). Workers that can process same set of tasks can be grouped under common topic.

Protocol

Executor and Workers communicate in one common named exchange. Executor publishes task requests to the named exchange and Workers take requests from it. When Worker gets task request it parses all parameters, dispatches endpoint and starts task processing. During task processing Worker sends responses back to Executor.

Executor and Worker communication

Let's consider how communication between Executor and Worker happens. First of all engine resolves all tasks dependencies and schedules tasks that can be performed at the moment. Tasks are executed by worker-based engine Executor. Executor initiates task execution/reversion using Proxy.

  • Executor publishes task request (format is described below) into named exchange with routing key, that is used to deliver request to particular workers topic and waits for task requests to be accepted and confirmed by Workers. If Executor doesn't get task confirmation from Workers within the given timeout task is considered as timed-out and the Timeout exception is raised.
  • Worker receives a request message it starts a new thread for processing it.
  • Worker dispatches request (gets desired endpoint that actually executes task):
  • If request dispatching succeeded Worker sends confirmation response to the Executor:
{'state': 'RUNNING'}
  • If request dispatching failed Worker sends failure response to the Executor with misc.Failure object:
{'state': 'FAILURE', 'result': <misc.Failure>}
  • Executor gets task request confirmation from the Worker and task request state changed from the Pending to the Running state. Once task request is in the Running state it can't be timed-out (considering that task execution process may take unpredictable time).
  • Worker executes task and once it is finished sends result back to the Executor:
  • If result is of the misc.Failure type response with the failure state is sent back:
{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}
  • If result is not of the misc.Failure type response with the success state is sent back:
{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}
  • Executor gets task execution result from Worker and passes it back to the WorkerTaskExecutor and worker-based engine to finish task processing.
  • Worker is subscribed on the task progress and every time task progress event is triggered it sends progress notification to the Executor where it is handled by engine:
{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}

Notes

  • misc.Failure objects are not json-serializable, so they are converted to dict before sending and converted from dict after receiving on both Executor/Worker sides.
  • When Worker sends response to the Executor it publishes to the exchange that is taken from the reply_to parameter of the AMQP message to reply to the correct Executor.

Executor request format

  • task - full task name to be performed
  • action - task action to be performed (e.g. execute, revert)
  • arguments - arguments the task action to be called with
  • result - task execution result (result or misc.Failure) [passed to revert only]
  • failures - flow tasks failures map (map of tasks names and misc.Failures) [passed to revert only]
Example

{'task': 'tasks.CallJoe', 'action': 'execute', 'arguments': {'joe_number': 444}}

Additionally, the following parameters are added to the AMQP request message:

  • reply_to - Executor named exchange workers will send responses back to
  • correlation_id - Executor request id (since there can be multiple request being processed simultaneously)

Worker response format

  • Task running: {'status': 'RUNNING'}
  • Task progress: {'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}
  • Task succeeded: {'state': 'SUCCESS', 'event': <event>, 'result': <result>)}
  • Task failed: {'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}