Difference between revisions of "TaskFlow/Worker-based Engine"
< TaskFlow
(→Worker-based engine) |
(→Development state) |
||
(6 intermediate revisions by one other user not shown) | |||
Line 1: | Line 1: | ||
== Worker-based engine == | == Worker-based engine == | ||
=== Communication protocol === | === Communication protocol === | ||
+ | Worker-based engine communication protocol is described [[TaskFlow/Worker-based_Engine_Protocol|here]]. | ||
=== Usage === | === Usage === | ||
+ | Let's consider how the <code>worker-based engine</code> can be applied. First, it is needed to configure <code>worker</code> and <code>flow</code> to be used with the <code>worker-based engine</code>. After configuration is done, <code>worker</code> and <code>flow</code> can be started. | ||
− | === | + | ==== Worker configuration==== |
− | + | ||
− | * | + | ===== Format ===== |
− | + | * <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]); | |
+ | * <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed; | ||
+ | * <code>topic</code> - topic name under which workers with the same set of tasks are started; | ||
+ | * <code>tasks</code> - list of tasks worker is able to perform; | ||
+ | * <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]); | ||
+ | * <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport). | ||
+ | |||
+ | ===== Example with AMQP transport ===== | ||
+ | <pre> | ||
+ | from taskflow.engines.worker_based import worker as w | ||
+ | |||
+ | config = { | ||
+ | 'url': 'amqp://guest:guest@localhost:5672//' | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'topic': 'test-tasks', | ||
+ | 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'] | ||
+ | } | ||
+ | worker = w.Worker(**config) | ||
+ | worker.run() | ||
+ | </pre> | ||
+ | |||
+ | ===== Example filesystem transport ===== | ||
+ | <pre> | ||
+ | from taskflow.engines.worker_based import worker as w | ||
+ | |||
+ | config = { | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'topic': 'test-tasks', | ||
+ | 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'], | ||
+ | 'transport': 'filesystem', | ||
+ | 'transport_options': { | ||
+ | 'data_folder_in': '/tmp/test', | ||
+ | 'data_folder_out': '/tmp/test' | ||
+ | } | ||
+ | } | ||
+ | worker = w.Worker(**config) | ||
+ | worker.run() | ||
+ | </pre> | ||
+ | |||
+ | ==== Flow configuration ==== | ||
+ | |||
+ | ===== Format ===== | ||
+ | * <code>engine</code> - engine type to be used ('worker-based' in our case); | ||
+ | * <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]); | ||
+ | * <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed; | ||
+ | * <code>topics</code> - list of workers topics to communicate with; | ||
+ | * <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]); | ||
+ | * <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport). | ||
+ | |||
+ | ===== Example with AMQP transport ===== | ||
+ | <pre> | ||
+ | ... | ||
+ | engine_conf = { | ||
+ | 'engine': 'worker-based', | ||
+ | 'url': 'amqp://guest:guest@localhost:5672//' | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'topics': ['topic1', 'topic2'] | ||
+ | } | ||
+ | flow = lf.Flow('simple-linear').add(...) | ||
+ | eng = taskflow.engines.load(flow, engine_conf=engine_conf) | ||
+ | eng.run() | ||
+ | </pre> | ||
+ | |||
+ | ===== Example with filesystem transport ===== | ||
+ | <pre> | ||
+ | ... | ||
+ | engine_conf = { | ||
+ | 'engine': 'worker-based', | ||
+ | 'exchange': 'test-exchange', | ||
+ | 'topics': ['topic1', 'topic2'], | ||
+ | 'transport': 'filesystem', | ||
+ | 'transport_options': { | ||
+ | 'data_folder_in': '/tmp/test', | ||
+ | 'data_folder_out': '/tmp/test' | ||
+ | } | ||
+ | } | ||
+ | flow = lf.Flow('simple-linear').add(...) | ||
+ | eng = taskflow.engines.load(flow, engine_conf=engine_conf) | ||
+ | eng.run() | ||
+ | </pre> |
Latest revision as of 19:49, 28 April 2014
Contents
Worker-based engine
Communication protocol
Worker-based engine communication protocol is described here.
Usage
Let's consider how the worker-based engine
can be applied. First, it is needed to configure worker
and flow
to be used with the worker-based engine
. After configuration is done, worker
and flow
can be started.
Worker configuration
Format
-
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
topic
- topic name under which workers with the same set of tasks are started; -
tasks
- list of tasks worker is able to perform; -
transport
- can be used to specify the transport explicitly (see available transports); -
transport_options
- transport specific options (for instance,data_folder_in
anddata_folder_out
parameters for thefilesystem
transport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w config = { 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'] } worker = w.Worker(**config) worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w config = { 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'], 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } } worker = w.Worker(**config) worker.run()
Flow configuration
Format
-
engine
- engine type to be used ('worker-based' in our case); -
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
topics
- list of workers topics to communicate with; -
transport
- can be used to specify the transport explicitly (see available transports); -
transport_options
- transport specific options (for instance,data_folder_in
anddata_folder_out
parameters for thefilesystem
transport).
Example with AMQP transport
... engine_conf = { 'engine': 'worker-based', 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'topics': ['topic1', 'topic2'] } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()
Example with filesystem transport
... engine_conf = { 'engine': 'worker-based', 'exchange': 'test-exchange', 'topics': ['topic1', 'topic2'], 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()