Difference between revisions of "TaskFlow/Worker-based Engine Protocol"
(3 intermediate revisions by the same user not shown) | |||
Line 11: | Line 11: | ||
== Protocol == | == Protocol == | ||
<code>Executor</code> and <code>Workers</code> communicate in one common named exchange. <code>Executor</code> publishes task requests to the named exchange and <code>Workers</code> take requests from it. When <code>Worker</code> gets task request it parses all parameters, dispatches endpoint and starts task processing. During task processing <code>Worker</code> sends responses back to <code>Executor</code>. | <code>Executor</code> and <code>Workers</code> communicate in one common named exchange. <code>Executor</code> publishes task requests to the named exchange and <code>Workers</code> take requests from it. When <code>Worker</code> gets task request it parses all parameters, dispatches endpoint and starts task processing. During task processing <code>Worker</code> sends responses back to <code>Executor</code>. | ||
+ | |||
+ | ==== Getting topic tasks list by Executor ==== | ||
+ | Initially <code>Executor</code> has list of worker topics, but does not have list of topic tasks. So, when <code>Executor</code> is started it begins to send the <code>Notify</code> messages to all topics. Once topic (actually topic worker) receives the <code>Notify</code> message it responses back with its <code>topic</code> name and <code>tasks</code> list: | ||
+ | |||
+ | <pre>{'topic': 'test-topic', 'tasks': [u'taskflow.tests.utils.TaskOneArgOneReturn', u'taskflow.tests.utils.TaskMultiArgOneReturn']}</pre> | ||
+ | |||
+ | Once <code>Executor</code> received list of tasks it publishes all cached (waiting) requests to the proper topic, so topic worker will start task execution. | ||
==== Executor and Worker communication ==== | ==== Executor and Worker communication ==== | ||
Line 16: | Line 23: | ||
First of all engine resolves all tasks dependencies and schedules tasks that can be performed at the moment. Tasks are executed by worker-based engine <code>Executor</code>. <code>Executor</code> initiates task execution/reversion using <code>Proxy</code>. | First of all engine resolves all tasks dependencies and schedules tasks that can be performed at the moment. Tasks are executed by worker-based engine <code>Executor</code>. <code>Executor</code> initiates task execution/reversion using <code>Proxy</code>. | ||
* <code>Executor</code> publishes task request (format is described below) into named exchange with <code>routing key</code>, that is used to deliver request to particular workers <code>topic</code> and waits for task requests to be accepted and confirmed by <code>Workers</code>. If <code>Executor</code> doesn't get task confirmation from <code>Workers</code> within the given timeout task is considered as timed-out and the <code>Timeout</code> exception is raised. | * <code>Executor</code> publishes task request (format is described below) into named exchange with <code>routing key</code>, that is used to deliver request to particular workers <code>topic</code> and waits for task requests to be accepted and confirmed by <code>Workers</code>. If <code>Executor</code> doesn't get task confirmation from <code>Workers</code> within the given timeout task is considered as timed-out and the <code>Timeout</code> exception is raised. | ||
− | * <code>Worker</code> receives a request message | + | * <code>Worker</code> receives a request message and starts a new thread for processing it. |
* <code>Worker</code> dispatches request (gets desired endpoint that actually executes task):<br /> | * <code>Worker</code> dispatches request (gets desired endpoint that actually executes task):<br /> | ||
::* If request dispatching <code>succeeded</code> Worker sends confirmation response to the Executor:<br /> | ::* If request dispatching <code>succeeded</code> Worker sends confirmation response to the Executor:<br /> | ||
− | ::: < | + | ::: <pre>{'state': 'RUNNING'}</pre> |
::* If request dispatching <code>failed</code> Worker sends failure response to the <code>Executor</code> with <code>misc.Failure</code> object: | ::* If request dispatching <code>failed</code> Worker sends failure response to the <code>Executor</code> with <code>misc.Failure</code> object: | ||
− | :::< | + | :::<pre>{'state': 'FAILURE', 'result': <misc.Failure>}</pre> |
* <code>Executor</code> gets task request confirmation from the <code>Worker</code> and task request state changed from the <code>Pending</code> to the <code>Running</code> state. Once task request is in the <code>Running</code> state it can't be timed-out (considering that task execution process may take unpredictable time). | * <code>Executor</code> gets task request confirmation from the <code>Worker</code> and task request state changed from the <code>Pending</code> to the <code>Running</code> state. Once task request is in the <code>Running</code> state it can't be timed-out (considering that task execution process may take unpredictable time). | ||
* <code>Worker</code> executes task and once it is finished sends result back to the <code>Executor</code>: | * <code>Worker</code> executes task and once it is finished sends result back to the <code>Executor</code>: | ||
::* If result is of the <code>misc.Failure</code> type response with the failure state is sent back: | ::* If result is of the <code>misc.Failure</code> type response with the failure state is sent back: | ||
− | ::: < | + | ::: <pre>{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}</pre> |
::* If result is not of the <code>misc.Failure</code> type response with the success state is sent back: | ::* If result is not of the <code>misc.Failure</code> type response with the success state is sent back: | ||
− | ::: < | + | ::: <pre>{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}</pre> |
* <code>Executor</code> gets task execution result from <code>Worker</code> and passes it back to the WorkerTaskExecutor and worker-based engine to finish task processing. | * <code>Executor</code> gets task execution result from <code>Worker</code> and passes it back to the WorkerTaskExecutor and worker-based engine to finish task processing. | ||
* <code>Worker</code> is subscribed on the task progress and every time task progress event is triggered it sends progress notification to the <code>Executor</code> where it is handled by engine: | * <code>Worker</code> is subscribed on the task progress and every time task progress event is triggered it sends progress notification to the <code>Executor</code> where it is handled by engine: | ||
− | ::: < | + | ::: <pre>{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}</pre> |
==== Notes ==== | ==== Notes ==== | ||
* <code>misc.Failure</code> objects are not json-serializable, so they are converted to dict before sending and converted from dict after receiving on both <code>Executor</code>/<code>Worker</code> sides. | * <code>misc.Failure</code> objects are not json-serializable, so they are converted to dict before sending and converted from dict after receiving on both <code>Executor</code>/<code>Worker</code> sides. | ||
* When <code>Worker</code> sends response to the <code>Executor</code> it publishes to the exchange that is taken from the <code>reply_to</code> parameter of the AMQP message to reply to the correct <code>Executor</code>. | * When <code>Worker</code> sends response to the <code>Executor</code> it publishes to the exchange that is taken from the <code>reply_to</code> parameter of the AMQP message to reply to the correct <code>Executor</code>. | ||
+ | * It is up to the final user to make sure that all workers in the topic have the same set of tasks. | ||
==== Executor request format ==== | ==== Executor request format ==== | ||
Line 44: | Line 52: | ||
===== Example ===== | ===== Example ===== | ||
− | < | + | <pre>{'task': 'tasks.CallJoe', 'action': 'execute', 'arguments': {'joe_number': 444}}</pre> |
Additionally, the following parameters are added to the AMQP request message: | Additionally, the following parameters are added to the AMQP request message: | ||
Line 51: | Line 59: | ||
==== Worker response format ==== | ==== Worker response format ==== | ||
− | * Task '''running''': < | + | * Task '''running''': |
− | * Task '''progress''': < | + | <pre>{'status': 'RUNNING'}</pre> |
− | * Task '''succeeded''': < | + | * Task '''progress''': |
− | * Task '''failed''': < | + | <pre>{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}</pre> |
+ | * Task '''succeeded''': | ||
+ | <pre>{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}</pre> | ||
+ | * Task '''failed''': | ||
+ | <pre>{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}</pre> |
Latest revision as of 16:00, 19 March 2014
Contents
Worker-based engine
Worker-based engine
uses remote workers to perform high load, computational and time-consuming tasks. There are two communication sides - Executor
and Worker
that communicate using Proxy
component. Proxy
is designed to accept/publish messages from/into named exchange.
Executor
Executor
is a part of the worker-based engine and is used to publish task requests, so these requests can be accepted and processed by remote Workers
.
Worker
Worker
is started on remote host and has list of tasks it can perform. Worker
accepts and processes task requests that are published by Executor
. Several requests can be processed simultaneously in separate threads (executor can be passed to the Worker
and configured). Workers that can process same set of tasks can be grouped under common topic
.
Protocol
Executor
and Workers
communicate in one common named exchange. Executor
publishes task requests to the named exchange and Workers
take requests from it. When Worker
gets task request it parses all parameters, dispatches endpoint and starts task processing. During task processing Worker
sends responses back to Executor
.
Getting topic tasks list by Executor
Initially Executor
has list of worker topics, but does not have list of topic tasks. So, when Executor
is started it begins to send the Notify
messages to all topics. Once topic (actually topic worker) receives the Notify
message it responses back with its topic
name and tasks
list:
{'topic': 'test-topic', 'tasks': [u'taskflow.tests.utils.TaskOneArgOneReturn', u'taskflow.tests.utils.TaskMultiArgOneReturn']}
Once Executor
received list of tasks it publishes all cached (waiting) requests to the proper topic, so topic worker will start task execution.
Executor and Worker communication
Let's consider how communication between Executor
and Worker
happens.
First of all engine resolves all tasks dependencies and schedules tasks that can be performed at the moment. Tasks are executed by worker-based engine Executor
. Executor
initiates task execution/reversion using Proxy
.
-
Executor
publishes task request (format is described below) into named exchange withrouting key
, that is used to deliver request to particular workerstopic
and waits for task requests to be accepted and confirmed byWorkers
. IfExecutor
doesn't get task confirmation fromWorkers
within the given timeout task is considered as timed-out and theTimeout
exception is raised. -
Worker
receives a request message and starts a new thread for processing it. -
Worker
dispatches request (gets desired endpoint that actually executes task):
- If request dispatching
succeeded
Worker sends confirmation response to the Executor:
-
{'state': 'RUNNING'}
- If request dispatching
failed
Worker sends failure response to theExecutor
withmisc.Failure
object:
{'state': 'FAILURE', 'result': <misc.Failure>}
- If request dispatching
-
Executor
gets task request confirmation from theWorker
and task request state changed from thePending
to theRunning
state. Once task request is in theRunning
state it can't be timed-out (considering that task execution process may take unpredictable time). -
Worker
executes task and once it is finished sends result back to theExecutor
:
- If result is of the
misc.Failure
type response with the failure state is sent back:
-
{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}
- If result is not of the
misc.Failure
type response with the success state is sent back:
-
{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}
- If result is of the
-
Executor
gets task execution result fromWorker
and passes it back to the WorkerTaskExecutor and worker-based engine to finish task processing. -
Worker
is subscribed on the task progress and every time task progress event is triggered it sends progress notification to theExecutor
where it is handled by engine:
-
{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}
-
Notes
-
misc.Failure
objects are not json-serializable, so they are converted to dict before sending and converted from dict after receiving on bothExecutor
/Worker
sides. - When
Worker
sends response to theExecutor
it publishes to the exchange that is taken from thereply_to
parameter of the AMQP message to reply to the correctExecutor
. - It is up to the final user to make sure that all workers in the topic have the same set of tasks.
Executor request format
- task - full task name to be performed
- action - task action to be performed (e.g. execute, revert)
- arguments - arguments the task action to be called with
- result - task execution result (result or misc.Failure) [passed to revert only]
- failures - flow tasks failures map (map of tasks names and misc.Failures) [passed to revert only]
Example
{'task': 'tasks.CallJoe', 'action': 'execute', 'arguments': {'joe_number': 444}}
Additionally, the following parameters are added to the AMQP request message:
- reply_to - Executor named exchange workers will send responses back to
- correlation_id - Executor request id (since there can be multiple request being processed simultaneously)
Worker response format
- Task running:
{'status': 'RUNNING'}
- Task progress:
{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}
- Task succeeded:
{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}
- Task failed:
{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}