Jump to: navigation, search

Difference between revisions of "TaskFlow/Worker-based Engine"

(Development state)
 
(5 intermediate revisions by one other user not shown)
Line 4: Line 4:
  
 
=== Usage ===
 
=== Usage ===
Let's consider how the <code>worker-based engine</code> can be applied. First, it is needed to configure <code>worker</code> and <code>flow</code> to be used with the <code>worker-based engine</code>. After configuration is done, <code>worker</code> can be started and <code>flow</code> can be run.
+
Let's consider how the <code>worker-based engine</code> can be applied. First, it is needed to configure <code>worker</code> and <code>flow</code> to be used with the <code>worker-based engine</code>. After configuration is done, <code>worker</code> and <code>flow</code> can be started.
  
 
==== Worker configuration====
 
==== Worker configuration====
Line 13: Line 13:
 
* <code>topic</code> - topic name under which workers with the same set of tasks are started;
 
* <code>topic</code> - topic name under which workers with the same set of tasks are started;
 
* <code>tasks</code> - list of tasks worker is able to perform;
 
* <code>tasks</code> - list of tasks worker is able to perform;
* <code>transport</code> - can be used to specify the transport explicitly (for <code>memory</code> and <code>filesystem</code> transports);
+
* <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]);
* <code>transport_options</code> - transport options (<code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code>) transport.
+
* <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport).
  
 
===== Example with AMQP transport =====
 
===== Example with AMQP transport =====
Line 54: Line 54:
 
* <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]);
 
* <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]);
 
* <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed;
 
* <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed;
* <code>transport</code> - can be used to specify the transport explicitly (for <code>memory</code> and <code>filesystem</code> transports);
+
* <code>topics</code> - list of workers topics to communicate with;
* <code>transport_options</code> - transport options (<code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code>) transport;
+
* <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]);
* <code>workers_info</code> - information about workers (topic-to-tasks map);
+
* <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport).
  
 
===== Example with AMQP transport =====
 
===== Example with AMQP transport =====
Line 65: Line 65:
 
     'url': 'amqp://guest:guest@localhost:5672//'
 
     'url': 'amqp://guest:guest@localhost:5672//'
 
     'exchange': 'test-exchange',
 
     'exchange': 'test-exchange',
     'workers_info': {
+
     'topics': ['topic1', 'topic2']
        'test-topic': ['tasks:TestTask1', 'tasks:TestTask2']
 
    }
 
 
}
 
}
 
flow = lf.Flow('simple-linear').add(...)
 
flow = lf.Flow('simple-linear').add(...)
Line 80: Line 78:
 
     'engine': 'worker-based',
 
     'engine': 'worker-based',
 
     'exchange': 'test-exchange',
 
     'exchange': 'test-exchange',
 +
    'topics': ['topic1', 'topic2'],
 
     'transport': 'filesystem',
 
     'transport': 'filesystem',
 
     'transport_options': {
 
     'transport_options': {
Line 85: Line 84:
 
         'data_folder_out': '/tmp/test'
 
         'data_folder_out': '/tmp/test'
 
     }
 
     }
    'workers_info': {
 
        'test-topic': ['tasks:TestTask1', 'tasks:TestTask2']
 
    }
 
 
 
}
 
}
 
flow = lf.Flow('simple-linear').add(...)
 
flow = lf.Flow('simple-linear').add(...)
Line 94: Line 89:
 
eng.run()
 
eng.run()
 
</pre>
 
</pre>
 
=== Development state ===
 
Currently, basic and fully working implementation of the <code>worker-based engine</code> is implemented. But it is not recommended to use it in the production yet due to the following issues that are needed to be resolved:
 
* '''Collecting information from workers'''. Currently, it is necessary to specify worker topics and their tasks in the engine configuration. But this information should be collected from workers automatically using some kind of broadcast messages. This will resolve problem with adding new workers on the fly too.
 
* '''Engine failover'''. Currently, worker send task results without getting confirmation from the engine that it was delivered. In case when task is being performed on worker and engine failed unexpectedly, worker will send results to engine that won't get it. This may lead to some problems.
 
* '''Worker failover'''. Currently, if worker started task execution and then failed unexpectedly, executor will wait forever until task is finished. This behavior may lead to flow hanging.
 
Some kind of keep-alive messages should be sent from worker, so executor would know that task is still being performed. Otherwise, mark request as failed.
 

Latest revision as of 19:49, 28 April 2014

Worker-based engine

Communication protocol

Worker-based engine communication protocol is described here.

Usage

Let's consider how the worker-based engine can be applied. First, it is needed to configure worker and flow to be used with the worker-based engine. After configuration is done, worker and flow can be started.

Worker configuration

Format
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topic - topic name under which workers with the same set of tasks are started;
  • tasks - list of tasks worker is able to perform;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w

config = {
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2']
}
worker = w.Worker(**config)
worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w

config = {
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
worker = w.Worker(**config)
worker.run()

Flow configuration

Format
  • engine - engine type to be used ('worker-based' in our case);
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topics - list of workers topics to communicate with;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
...
engine_conf = {
    'engine': 'worker-based',
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2']
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()
Example with filesystem transport
...
engine_conf = {
    'engine': 'worker-based',
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()