Difference between revisions of "TaskFlow/Worker-based Engine"
< TaskFlow
(→Development state) |
|||
(5 intermediate revisions by one other user not shown) | |||
Line 4: | Line 4: | ||
=== Usage === | === Usage === | ||
− | Let's consider how the <code>worker-based engine</code> can be applied. First, it is needed to configure <code>worker</code> and <code>flow</code> to be used with the <code>worker-based engine</code>. After configuration is done, <code>worker</code> | + | Let's consider how the <code>worker-based engine</code> can be applied. First, it is needed to configure <code>worker</code> and <code>flow</code> to be used with the <code>worker-based engine</code>. After configuration is done, <code>worker</code> and <code>flow</code> can be started. |
==== Worker configuration==== | ==== Worker configuration==== | ||
Line 13: | Line 13: | ||
* <code>topic</code> - topic name under which workers with the same set of tasks are started; | * <code>topic</code> - topic name under which workers with the same set of tasks are started; | ||
* <code>tasks</code> - list of tasks worker is able to perform; | * <code>tasks</code> - list of tasks worker is able to perform; | ||
− | * <code>transport</code> - can be used to specify the transport explicitly ( | + | * <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]); |
− | * <code>transport_options</code> - transport options (<code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code>) | + | * <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport). |
===== Example with AMQP transport ===== | ===== Example with AMQP transport ===== | ||
Line 54: | Line 54: | ||
* <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]); | * <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]); | ||
* <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed; | * <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed; | ||
− | * <code>transport</code> - can be used to specify the transport explicitly ( | + | * <code>topics</code> - list of workers topics to communicate with; |
− | * <code>transport_options</code> - transport options (<code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> | + | * <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]); |
− | + | * <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport). | |
===== Example with AMQP transport ===== | ===== Example with AMQP transport ===== | ||
Line 65: | Line 65: | ||
'url': 'amqp://guest:guest@localhost:5672//' | 'url': 'amqp://guest:guest@localhost:5672//' | ||
'exchange': 'test-exchange', | 'exchange': 'test-exchange', | ||
− | ' | + | 'topics': ['topic1', 'topic2'] |
− | |||
− | |||
} | } | ||
flow = lf.Flow('simple-linear').add(...) | flow = lf.Flow('simple-linear').add(...) | ||
Line 80: | Line 78: | ||
'engine': 'worker-based', | 'engine': 'worker-based', | ||
'exchange': 'test-exchange', | 'exchange': 'test-exchange', | ||
+ | 'topics': ['topic1', 'topic2'], | ||
'transport': 'filesystem', | 'transport': 'filesystem', | ||
'transport_options': { | 'transport_options': { | ||
Line 85: | Line 84: | ||
'data_folder_out': '/tmp/test' | 'data_folder_out': '/tmp/test' | ||
} | } | ||
− | |||
− | |||
− | |||
− | |||
} | } | ||
flow = lf.Flow('simple-linear').add(...) | flow = lf.Flow('simple-linear').add(...) | ||
Line 94: | Line 89: | ||
eng.run() | eng.run() | ||
</pre> | </pre> | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− |
Latest revision as of 19:49, 28 April 2014
Contents
Worker-based engine
Communication protocol
Worker-based engine communication protocol is described here.
Usage
Let's consider how the worker-based engine
can be applied. First, it is needed to configure worker
and flow
to be used with the worker-based engine
. After configuration is done, worker
and flow
can be started.
Worker configuration
Format
-
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
topic
- topic name under which workers with the same set of tasks are started; -
tasks
- list of tasks worker is able to perform; -
transport
- can be used to specify the transport explicitly (see available transports); -
transport_options
- transport specific options (for instance,data_folder_in
anddata_folder_out
parameters for thefilesystem
transport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w config = { 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'] } worker = w.Worker(**config) worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w config = { 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'], 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } } worker = w.Worker(**config) worker.run()
Flow configuration
Format
-
engine
- engine type to be used ('worker-based' in our case); -
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
topics
- list of workers topics to communicate with; -
transport
- can be used to specify the transport explicitly (see available transports); -
transport_options
- transport specific options (for instance,data_folder_in
anddata_folder_out
parameters for thefilesystem
transport).
Example with AMQP transport
... engine_conf = { 'engine': 'worker-based', 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'topics': ['topic1', 'topic2'] } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()
Example with filesystem transport
... engine_conf = { 'engine': 'worker-based', 'exchange': 'test-exchange', 'topics': ['topic1', 'topic2'], 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()