Jump to: navigation, search

TaskFlow/Worker-based Engine

< TaskFlow
Revision as of 19:49, 28 April 2014 by Harlowja (talk | contribs) (Development state)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

Worker-based engine

Communication protocol

Worker-based engine communication protocol is described here.

Usage

Let's consider how the worker-based engine can be applied. First, it is needed to configure worker and flow to be used with the worker-based engine. After configuration is done, worker and flow can be started.

Worker configuration

Format
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topic - topic name under which workers with the same set of tasks are started;
  • tasks - list of tasks worker is able to perform;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w

config = {
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2']
}
worker = w.Worker(**config)
worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w

config = {
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
worker = w.Worker(**config)
worker.run()

Flow configuration

Format
  • engine - engine type to be used ('worker-based' in our case);
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topics - list of workers topics to communicate with;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
...
engine_conf = {
    'engine': 'worker-based',
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2']
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()
Example with filesystem transport
...
engine_conf = {
    'engine': 'worker-based',
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()