TaskFlow/Worker-based Engine
< TaskFlow
Contents
Worker-based engine
Communication protocol
Worker-based engine communication protocol is described here.
Usage
Let's consider how the worker-based engine
can be applied. First, it is needed to configure worker
and flow
to be used with the worker-based engine
. After configuration is done, worker
and flow
can be started.
Worker configuration
Format
-
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
topic
- topic name under which workers with the same set of tasks are started; -
tasks
- list of tasks worker is able to perform; -
transport
- can be used to specify the transport explicitly (see available transports); -
transport_options
- transport specific options (for instance,data_folder_in
anddata_folder_out
parameters for thefilesystem
transport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w config = { 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'] } worker = w.Worker(**config) worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w config = { 'exchange': 'test-exchange', 'topic': 'test-tasks', 'tasks': ['tasks:TestTask1', 'tasks:TestTask2'], 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } } worker = w.Worker(**config) worker.run()
Flow configuration
Format
-
engine
- engine type to be used ('worker-based' in our case); -
url
- broker connection url (see format in kombu documentation); -
exchange
- exchange name in whichexecutor
/worker
communication is performed; -
topics
- list of workers topics to communicate with; -
transport
- can be used to specify the transport explicitly (see available transports); -
transport_options
- transport specific options (for instance,data_folder_in
anddata_folder_out
parameters for thefilesystem
transport).
Example with AMQP transport
... engine_conf = { 'engine': 'worker-based', 'url': 'amqp://guest:guest@localhost:5672//' 'exchange': 'test-exchange', 'topics': ['topic1', 'topic2'] } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()
Example with filesystem transport
... engine_conf = { 'engine': 'worker-based', 'exchange': 'test-exchange', 'topics': ['topic1', 'topic2'], 'transport': 'filesystem', 'transport_options': { 'data_folder_in': '/tmp/test', 'data_folder_out': '/tmp/test' } } flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run()