Jump to: navigation, search

Difference between revisions of "TaskFlow/Worker-based Engine"

(Worker-based flow)
(Development state)
 
(10 intermediate revisions by one other user not shown)
Line 1: Line 1:
 
== Worker-based engine ==
 
== Worker-based engine ==
=== Development state ===
+
=== Communication protocol ===
 +
Worker-based engine communication protocol is described [[TaskFlow/Worker-based_Engine_Protocol|here]].
 +
 
 +
=== Usage ===
 +
Let's consider how the <code>worker-based engine</code> can be applied. First, it is needed to configure <code>worker</code> and <code>flow</code> to be used with the <code>worker-based engine</code>. After configuration is done, <code>worker</code> and <code>flow</code> can be started.
 +
 
 +
==== Worker configuration====
 +
 
 +
===== Format =====
 +
* <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]);
 +
* <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed;
 +
* <code>topic</code> - topic name under which workers with the same set of tasks are started;
 +
* <code>tasks</code> - list of tasks worker is able to perform;
 +
* <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]);
 +
* <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport).
 +
 
 +
===== Example with AMQP transport =====
 +
<pre>
 +
from taskflow.engines.worker_based import worker as w
 +
 
 +
config = {
 +
    'url': 'amqp://guest:guest@localhost:5672//'
 +
    'exchange': 'test-exchange',
 +
    'topic': 'test-tasks',
 +
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2']
 +
}
 +
worker = w.Worker(**config)
 +
worker.run()
 +
</pre>
 +
 
 +
===== Example filesystem transport =====
 +
<pre>
 +
from taskflow.engines.worker_based import worker as w
 +
 
 +
config = {
 +
    'exchange': 'test-exchange',
 +
    'topic': 'test-tasks',
 +
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
 +
    'transport': 'filesystem',
 +
    'transport_options': {
 +
        'data_folder_in': '/tmp/test',
 +
        'data_folder_out': '/tmp/test'
 +
    }
 +
}
 +
worker = w.Worker(**config)
 +
worker.run()
 +
</pre>
 +
 
 +
==== Flow configuration ====
 +
 
 +
===== Format =====
 +
* <code>engine</code> - engine type to be used ('worker-based' in our case);
 +
* <code>url</code> - broker connection url (see format in kombu [https://kombu.readthedocs.org/en/latest/reference/kombu.html#connection documentation]);
 +
* <code>exchange</code> - exchange name in which <code>executor</code> / <code>worker</code> communication is performed;
 +
* <code>topics</code> - list of workers topics to communicate with;
 +
* <code>transport</code> - can be used to specify the transport explicitly (see available [http://kombu.readthedocs.org/en/latest/userguide/connections.html#amqp-transports transports]);
 +
* <code>transport_options</code> - transport specific options (for instance, <code>data_folder_in</code> and <code>data_folder_out</code> parameters for the <code>filesystem</code> transport).
 +
 
 +
===== Example with AMQP transport =====
 +
<pre>
 +
...
 +
engine_conf = {
 +
    'engine': 'worker-based',
 +
    'url': 'amqp://guest:guest@localhost:5672//'
 +
    'exchange': 'test-exchange',
 +
    'topics': ['topic1', 'topic2']
 +
}
 +
flow = lf.Flow('simple-linear').add(...)
 +
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
 +
eng.run()
 +
</pre>
 +
 
 +
===== Example with filesystem transport =====
 +
<pre>
 +
...
 +
engine_conf = {
 +
    'engine': 'worker-based',
 +
    'exchange': 'test-exchange',
 +
    'topics': ['topic1', 'topic2'],
 +
    'transport': 'filesystem',
 +
    'transport_options': {
 +
        'data_folder_in': '/tmp/test',
 +
        'data_folder_out': '/tmp/test'
 +
    }
 +
}
 +
flow = lf.Flow('simple-linear').add(...)
 +
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
 +
eng.run()
 +
</pre>

Latest revision as of 19:49, 28 April 2014

Worker-based engine

Communication protocol

Worker-based engine communication protocol is described here.

Usage

Let's consider how the worker-based engine can be applied. First, it is needed to configure worker and flow to be used with the worker-based engine. After configuration is done, worker and flow can be started.

Worker configuration

Format
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topic - topic name under which workers with the same set of tasks are started;
  • tasks - list of tasks worker is able to perform;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
from taskflow.engines.worker_based import worker as w

config = {
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2']
}
worker = w.Worker(**config)
worker.run()
Example filesystem transport
from taskflow.engines.worker_based import worker as w

config = {
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
worker = w.Worker(**config)
worker.run()

Flow configuration

Format
  • engine - engine type to be used ('worker-based' in our case);
  • url - broker connection url (see format in kombu documentation);
  • exchange - exchange name in which executor / worker communication is performed;
  • topics - list of workers topics to communicate with;
  • transport - can be used to specify the transport explicitly (see available transports);
  • transport_options - transport specific options (for instance, data_folder_in and data_folder_out parameters for the filesystem transport).
Example with AMQP transport
...
engine_conf = {
    'engine': 'worker-based',
    'url': 'amqp://guest:guest@localhost:5672//'
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2']
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()
Example with filesystem transport
...
engine_conf = {
    'engine': 'worker-based',
    'exchange': 'test-exchange',
    'topics': ['topic1', 'topic2'],
    'transport': 'filesystem',
    'transport_options': {
        'data_folder_in': '/tmp/test',
        'data_folder_out': '/tmp/test'
    }
}
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()