Difference between revisions of "TaskFlow/Worker-based Engine"
< TaskFlow
(→Worker-based engine) |
|||
Line 1: | Line 1: | ||
== Worker-based engine == | == Worker-based engine == | ||
=== Communication protocol === | === Communication protocol === | ||
+ | Worker-based engine communication protocol is described [[TaskFlow/Worker-based_Engine_Protocol|here]]. | ||
=== Usage === | === Usage === | ||
+ | Let's consider how the <code>worker-based engine</code> can be applied. First, it is needed to configure <code>worker</code> and <code>flow</code> to be used with the <code>worker-based engine</code>. After configuration is done, <code>worker</code> can be started and <code>flow</code> can be run. | ||
+ | |||
+ | ==== Worker configuration==== | ||
+ | |||
+ | ===== Format ===== | ||
+ | * <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]); | ||
+ | * <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed; | ||
+ | * <code>topic</code> - topic name under which workers with the same set of tasks are started; | ||
+ | * <code>tasks</code> - list of tasks worker is able to perform; | ||
+ | * <code>transport</code> - can be used to specify the transport explicitly (for <code>memory</code> and <code>filesystem</code> transports); | ||
+ | * <code>transport_options</code> - transport options (<code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code>) transport. | ||
+ | |||
+ | ===== Example with AMQP transport ===== | ||
+ | <pre> | ||
+ | from taskflow.engines.worker_based import worker as w | ||
+ | |||
+ | config = { | ||
+ | 'url': 'amqp://guest:guest@localhost:5672//' | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'topic': 'test-tasks', | ||
+ | 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'] | ||
+ | } | ||
+ | worker = w.Worker(**config) | ||
+ | worker.run() | ||
+ | </pre> | ||
+ | |||
+ | ===== Example filesystem transport ===== | ||
+ | <pre> | ||
+ | from taskflow.engines.worker_based import worker as w | ||
+ | |||
+ | config = { | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'topic': 'test-tasks', | ||
+ | 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'], | ||
+ | 'transport': 'filesystem', | ||
+ | 'transport_options': { | ||
+ | 'data_folder_in': '/tmp/test', | ||
+ | 'data_folder_out': '/tmp/test' | ||
+ | } | ||
+ | } | ||
+ | worker = w.Worker(**config) | ||
+ | worker.run() | ||
+ | </pre> | ||
+ | |||
+ | ==== Flow configuration ==== | ||
+ | |||
+ | ===== Format ===== | ||
+ | * <code>engine</code> - engine type to be used ('worker-based' in our case); | ||
+ | * <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]); | ||
+ | * <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed; | ||
+ | * <code>transport</code> - can be used to specify the transport explicitly (for <code>memory</code> and <code>filesystem</code> transports); | ||
+ | * <code>transport_options</code> - transport options (<code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code>) transport; | ||
+ | * <code>workers_info</code> - information about workers (topic-to-tasks map); | ||
+ | |||
+ | ===== Example with AMQP transport ===== | ||
+ | <pre> | ||
+ | ... | ||
+ | engine_conf = { | ||
+ | 'engine': 'worker-based', | ||
+ | 'url': 'amqp://guest:guest@localhost:5672//' | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'workers_info': { | ||
+ | 'test-topic': ['tasks:TestTask1', 'tasks:TestTask2'] | ||
+ | } | ||
+ | } | ||
+ | flow = lf.Flow('simple-linear').add(...) | ||
+ | eng = taskflow.engines.load(flow, engine_conf=engine_conf) | ||
+ | eng.run() | ||
+ | </pre> | ||
+ | |||
+ | ===== Example with filesystem transport ===== | ||
+ | <pre> | ||
+ | ... | ||
+ | engine_conf = { | ||
+ | 'engine': 'worker-based', | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'transport': 'filesystem', | ||
+ | 'transport_options': { | ||
+ | 'data_folder_in': '/tmp/test', | ||
+ | 'data_folder_out': '/tmp/test' | ||
+ | } | ||
+ | 'workers_info': { | ||
+ | 'test-topic': ['tasks:TestTask1', 'tasks:TestTask2'] | ||
+ | } | ||
+ | |||
+ | } | ||
+ | flow = lf.Flow('simple-linear').add(...) | ||
+ | eng = taskflow.engines.load(flow, engine_conf=engine_conf) | ||
+ | eng.run() | ||
+ | </pre> | ||
=== Development state === | === Development state === | ||
− | Currently, basic and fully working implementation of the worker-based engine is | + | Currently, basic and fully working implementation of the <code>worker-based engine</code> is implemented. But it is not recommended to use it in the production yet due to the following issues that are needed to be resolved: |
− | * | + | * '''Collecting information from workers'''. Currently, it is necessary to specify worker topics and their tasks in the engine configuration. But this information should be collected from workers automatically using some kind of broadcast messages. This will resolve problem with adding new workers on the fly too. |
− | * | + | * '''Engine failover'''. Currently, worker send task results without getting confirmation from the engine that it was delivered. In case when task is being performed on worker and engine failed unexpectedly, worker will send results to engine that won't get it. This may lead to some problems. |
+ | * '''Worker failover'''. Currently, if worker started task execution and then failed unexpectedly, executor will wait forever until task is finished. This behavior may lead to flow hanging. | ||
+ | Some kind of keep-alive messages should be sent from worker, so executor would know that task is still being performed. Otherwise, mark request as failed. |
Revision as of 15:23, 21 January 2014
Contents
Worker-based engine
Communication protocol
Worker-based engine communication protocol is described here.
Usage
Let's consider how the worker-based engine
can be applied. First, it is needed to configure worker
and flow
to be used with the worker-based engine
. After configuration is done, worker
can be started and flow
can be run.
Worker configuration
Format
-
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
topic
- topic name under which workers with the same set of tasks are started; -
tasks
- list of tasks worker is able to perform; -
transport
- can be used to specify the transport explicitly (formemory
andfilesystem
transports); -
transport_options
- transport options (data_folder_in
anddata_folder_out
parameters for thefilesystem
) transport.
Example with AMQP transport
from taskflow.engines.worker_based import worker as w config = { 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'] } worker = w.Worker(**config) worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w config = { 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'], 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } } worker = w.Worker(**config) worker.run()
Flow configuration
Format
-
engine
- engine type to be used ('worker-based' in our case); -
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
transport
- can be used to specify the transport explicitly (formemory
andfilesystem
transports); -
transport_options
- transport options (data_folder_in
anddata_folder_out
parameters for thefilesystem
) transport; -
workers_info
- information about workers (topic-to-tasks map);
Example with AMQP transport
... engine_conf = { 'engine': 'worker-based', 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'workers_info': { 'test-topic': ['tasks:TestTask1', 'tasks:TestTask2'] } } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()
Example with filesystem transport
... engine_conf = { 'engine': 'worker-based', 'exchange': 'test-exchange', 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } 'workers_info': { 'test-topic': ['tasks:TestTask1', 'tasks:TestTask2'] } } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()
Development state
Currently, basic and fully working implementation of the worker-based engine
is implemented. But it is not recommended to use it in the production yet due to the following issues that are needed to be resolved:
- Collecting information from workers. Currently, it is necessary to specify worker topics and their tasks in the engine configuration. But this information should be collected from workers automatically using some kind of broadcast messages. This will resolve problem with adding new workers on the fly too.
- Engine failover. Currently, worker send task results without getting confirmation from the engine that it was delivered. In case when task is being performed on worker and engine failed unexpectedly, worker will send results to engine that won't get it. This may lead to some problems.
- Worker failover. Currently, if worker started task execution and then failed unexpectedly, executor will wait forever until task is finished. This behavior may lead to flow hanging.
Some kind of keep-alive messages should be sent from worker, so executor would know that task is still being performed. Otherwise, mark request as failed.