TaskFlow/Worker-based Engine Protocol
Contents
Worker-based engine
Worker-based engine uses remote workers to perform high load, computational and time-consuming tasks. There are two communication sides - Executor and Worker that communicate using Proxy component. Proxy is designed to accept/publish messages from/into named exchange.
Executor
Executor is a part of the worker-based engine and is used to publish task requests, so these requests can be accepted and processed by remote Workers.
Worker
Worker is started on remote host and has list of tasks it can perform. Worker accepts and processes task requests that are published by Executor. Several requests can be processed simultaneously in separate threads (executor can be passed to the Worker and configured). Workers that can process same set of tasks can be grouped under common topic.
Protocol
Executor and Workers communicate in one common named exchange. Executor publishes task requests to the named exchange and Workers take requests from it. When Worker gets task request it parses all parameters, dispatches endpoint and starts task processing. During task processing Worker sends responses back to Executor.
Getting topic tasks list by Executor
Initially Executor has list of worker topics, but does not have list of topic tasks. So, when Executor is started it begins to send the Notify messages to all topics. Once topic (actually topic worker) receives the Notify message it responses back with its topic name and tasks list:
{'topic': 'test-topic', 'tasks': [u'taskflow.tests.utils.TaskOneArgOneReturn', u'taskflow.tests.utils.TaskMultiArgOneReturn']}
Once Executor received list of tasks it publishes all cached (waiting) requests to the proper topic, so topic worker will start task execution.
Executor and Worker communication
Let's consider how communication between Executor and Worker happens.
First of all engine resolves all tasks dependencies and schedules tasks that can be performed at the moment. Tasks are executed by worker-based engine Executor. Executor initiates task execution/reversion using Proxy.
-
Executorpublishes task request (format is described below) into named exchange withrouting key, that is used to deliver request to particular workerstopicand waits for task requests to be accepted and confirmed byWorkers. IfExecutordoesn't get task confirmation fromWorkerswithin the given timeout task is considered as timed-out and theTimeoutexception is raised. -
Workerreceives a request message and starts a new thread for processing it. -
Workerdispatches request (gets desired endpoint that actually executes task):
- If request dispatching
succeededWorker sends confirmation response to the Executor:
-
{'state': 'RUNNING'}
- If request dispatching
failedWorker sends failure response to theExecutorwithmisc.Failureobject:
{'state': 'FAILURE', 'result': <misc.Failure>}
- If request dispatching
-
Executorgets task request confirmation from theWorkerand task request state changed from thePendingto theRunningstate. Once task request is in theRunningstate it can't be timed-out (considering that task execution process may take unpredictable time). -
Workerexecutes task and once it is finished sends result back to theExecutor:
- If result is of the
misc.Failuretype response with the failure state is sent back:
-
{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}
- If result is not of the
misc.Failuretype response with the success state is sent back:
-
{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}
- If result is of the
-
Executorgets task execution result fromWorkerand passes it back to the WorkerTaskExecutor and worker-based engine to finish task processing. -
Workeris subscribed on the task progress and every time task progress event is triggered it sends progress notification to theExecutorwhere it is handled by engine:
-
{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}
-
Notes
-
misc.Failureobjects are not json-serializable, so they are converted to dict before sending and converted from dict after receiving on bothExecutor/Workersides. - When
Workersends response to theExecutorit publishes to the exchange that is taken from thereply_toparameter of the AMQP message to reply to the correctExecutor. - It is up to the final user to make sure that all workers in the topic have the same set of tasks.
Executor request format
- task - full task name to be performed
- action - task action to be performed (e.g. execute, revert)
- arguments - arguments the task action to be called with
- result - task execution result (result or misc.Failure) [passed to revert only]
- failures - flow tasks failures map (map of tasks names and misc.Failures) [passed to revert only]
Example
{'task': 'tasks.CallJoe', 'action': 'execute', 'arguments': {'joe_number': 444}}
Additionally, the following parameters are added to the AMQP request message:
- reply_to - Executor named exchange workers will send responses back to
- correlation_id - Executor request id (since there can be multiple request being processed simultaneously)
Worker response format
- Task running:
{'status': 'RUNNING'}
- Task progress:
{'state': 'PROGRESS', 'event_data': <event_data>, 'progress': <progress>}
- Task succeeded:
{'state': 'SUCCESS', 'event': <event>, 'result': <result>)}
- Task failed:
{'state': 'FAILURE', 'event': <event>, 'result': <misc.Failure>)}