Jump to: navigation, search

Difference between revisions of "TaskFlow/Worker-based Engine"

(Development state)
(Development state)
 
Line 89: Line 89:
 
eng.run()
 
eng.run()
 
</pre>
 
</pre>
 
=== Development state ===
 
Currently, fully working version of the <code>worker-based engine</code> is implemented. But it is not recommended to use it in the production yet due to the following issues that are needed to be resolved:
 
* '''Engine failover'''. Currently, worker send task results without getting confirmation from the engine that it was delivered. In case when task is being performed on worker and engine failed unexpectedly, worker will send results to engine that won't get it. This may lead to execution problems.
 
* '''Worker failover'''. Currently, if worker started task execution and then failed unexpectedly, executor will wait forever until task is finished. This behavior may lead to flow hanging. Some kind of keep-alive messages should be sent from worker, so executor would know that task is still being performed. Otherwise, mark request as failed.
 

Latest revision as of 19:49, 28 April 2014

Worker-based engine

Communication protocol

Worker-based engine communication protocol is described here.

Usage

Let's consider how the worker-based engine can be applied. First, it is needed to configure worker and flow to be used with the worker-based engine. After configuration is done, worker and flow can be started.

Worker configuration

Format
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topic - topic name under which workers with the same set of tasks are started;
  • tasks - list of tasks worker is able to perform;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w

config = {
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2']
}
worker = w.Worker(**config)
worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w

config = {
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
worker = w.Worker(**config)
worker.run()

Flow configuration

Format
  • engine - engine type to be used ('worker-based' in our case);
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topics - list of workers topics to communicate with;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
...
engine_conf = {
    'engine': 'worker-based',
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2']
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()
Example with filesystem transport
...
engine_conf = {
    'engine': 'worker-based',
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()