TaskFlow/Worker-based Engine
< TaskFlow(Redirected from TaskFlow/Worker-based engine)
Contents
Worker-based engine
Communication protocol
Worker-based engine communication protocol is described here.
Usage
Let's consider how the worker-based engine can be applied. First, it is needed to configure worker and flow to be used with the worker-based engine. After configuration is done, worker and flow can be started.
Worker configuration
Format
-
url- broker connection url (see format in kombu documentation); -
exchange- exchange name in whichexecutor/workercommunication is performed; -
topic- topic name under which workers with the same set of tasks are started; -
tasks- list of tasks worker is able to perform; -
transport- can be used to specify the transport explicitly (see available transports); -
transport_options- transport specific options (for instance,data_folder_inanddata_folder_outparameters for thefilesystemtransport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w
config = {
'url': 'amqp://guest:guest@localhost:5672//'
'exchange': 'test-exchange',
'topic': 'test-tasks',
'tasks': ['tasks:TestTask1', 'tasks:TestTask2']
}
worker = w.Worker(**config)
worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w
config = {
'exchange': 'test-exchange',
'topic': 'test-tasks',
'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
'transport': 'filesystem',
'transport_options': {
'data_folder_in': '/tmp/test',
'data_folder_out': '/tmp/test'
}
}
worker = w.Worker(**config)
worker.run()
Flow configuration
Format
-
engine- engine type to be used ('worker-based' in our case); -
url- broker connection url (see format in kombu documentation); -
exchange- exchange name in whichexecutor/workercommunication is performed; -
topics- list of workers topics to communicate with; -
transport- can be used to specify the transport explicitly (see available transports); -
transport_options- transport specific options (for instance,data_folder_inanddata_folder_outparameters for thefilesystemtransport).
Example with AMQP transport
...
engine_conf = {
'engine': 'worker-based',
'url': 'amqp://guest:guest@localhost:5672//'
'exchange': 'test-exchange',
'topics': ['topic1', 'topic2']
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()
Example with filesystem transport
...
engine_conf = {
'engine': 'worker-based',
'exchange': 'test-exchange',
'topics': ['topic1', 'topic2'],
'transport': 'filesystem',
'transport_options': {
'data_folder_in': '/tmp/test',
'data_folder_out': '/tmp/test'
}
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()