Jump to: navigation, search

TaskFlow/Worker-based Engine

Worker-based engine

Communication protocol

Worker-based engine communication protocol is described here.

Usage

Let's consider how the worker-based engine can be applied. First, it is needed to configure worker and flow to be used with the worker-based engine. After configuration is done, worker can be started and flow can be run.

Worker configuration

Format
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topic - topic name under which workers with the same set of tasks are started;
  • tasks - list of tasks worker is able to perform;
  • transport - can be used to specify the transport explicitly (for memory and filesystem transports);
  • transport_options - transport options (data_folder_in and data_folder_out parameters for the filesystem) transport.
Example with AMQP transport
from taskflow.engines.worker_based import worker as w

config = {
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2']
}
worker = w.Worker(**config)
worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w

config = {
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
worker = w.Worker(**config)
worker.run()

Flow configuration

Format
  • engine - engine type to be used ('worker-based' in our case);
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • transport - can be used to specify the transport explicitly (for memory and filesystem transports);
  • transport_options - transport options (data_folder_in and data_folder_out parameters for the filesystem) transport;
  • workers_info - information about workers (topic-to-tasks map);
Example with AMQP transport
...
engine_conf = {
    'engine': 'worker-based',
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'workers_info': {
        'test-topic': ['tasks:TestTask1', 'tasks:TestTask2']
    }
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()
Example with filesystem transport
...
engine_conf = {
    'engine': 'worker-based',
    'exchange': 'test-exchange',
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
    'workers_info': {
        'test-topic': ['tasks:TestTask1', 'tasks:TestTask2']
    }

}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()

Development state

Currently, basic and fully working implementation of the worker-based engine is implemented. But it is not recommended to use it in the production yet due to the following issues that are needed to be resolved:

  • Collecting information from workers. Currently, it is necessary to specify worker topics and their tasks in the engine configuration. But this information should be collected from workers automatically using some kind of broadcast messages. This will resolve problem with adding new workers on the fly too.
  • Engine failover. Currently, worker send task results without getting confirmation from the engine that it was delivered. In case when task is being performed on worker and engine failed unexpectedly, worker will send results to engine that won't get it. This may lead to some problems.
  • Worker failover. Currently, if worker started task execution and then failed unexpectedly, executor will wait forever until task is finished. This behavior may lead to flow hanging.

Some kind of keep-alive messages should be sent from worker, so executor would know that task is still being performed. Otherwise, mark request as failed.