Jump to: navigation, search

Difference between revisions of "Zaqar/bp/placement-service"

(Added whiteboard screenshot)
(Add: storage placement algorithm, redis caching, policy engine, placement API, future discussion)
Line 24: Line 24:
 
* A cache maintained in each Marconi instance of the mappings
 
* A cache maintained in each Marconi instance of the mappings
 
* A migration service
 
* A migration service
 +
 +
=== Placement API ===
 +
 +
The placement service must handle:
 +
 +
* storage assignment
 +
* cataloging
 +
* migrations
 +
 +
An API must be exposed to facilitate these operations.
 +
 +
==== Storage Management ====
 +
 +
* Add node (POST /v1/storage: {'location': 'mongo1.storage.com' , 'weight': 100})
 +
* Delete node (DELETE /v1/storage/mongo1.storage.com)
 +
* Change weight (PUT /v1/storage/mongo1.storage.com {'weight': 50}
 +
* Query storage (GET /v1/storage => [...], GET /v1/storage/mongo1.storage.com => {'weight': 100})
 +
 +
==== Catalogue Management ====
 +
 +
* Add entry (POST)
 +
* Update an entry (PUT)
 +
* Remove entry (DELETE)
 +
 +
==== Migration ====
 +
 +
This means: "move object from location A to location B". In terms of the Placement service, it must:
 +
 +
* Update the storage write location
 +
* Update the storage read location
 +
* Update the entry status ("a" => "m")
  
 
=== Catalogue ===
 
=== Catalogue ===
Line 29: Line 60:
 
The catalog, as described above, maintains these mappings from queues to locations.
 
The catalog, as described above, maintains these mappings from queues to locations.
  
It is strictly an optimization. Marconi can continue to function in a degraded state if the placement service goes down. The data contained in the catalogue can be regenerated by querying each existing queue while disabling data migration and caching.
+
Marconi can continue to function in a degraded state if the placement service goes down. The following capabilities will cease to function for Marconi if the placement service goes down:
  
The catalogue contains a series of structures that look as follows:
+
* Migration
 +
* Queue creation
 +
 
 +
It may be possible to continue performing queue creation and deletion while the placement service is down by defaulting to the last storage location that Marconi held.
 +
 
 +
The data contained in the catalogue can be regenerated by querying each Marconi endpoint and synchronizing against each cache.
 +
 
 +
The Marconi catalogue contains a series of structures that look as follows:
  
 
<pre><nowiki>
 
<pre><nowiki>
Line 38: Line 76:
 
         "r": ["mongo://192.168.1.105:7777", "mongo://192.168.1.106:7777"],
 
         "r": ["mongo://192.168.1.105:7777", "mongo://192.168.1.106:7777"],
 
         "w": "mongo://192.168.1.105:7777",
 
         "w": "mongo://192.168.1.105:7777",
         "s": 0
+
         "s": "a"
 
     }
 
     }
 
}
 
}
Line 46: Line 84:
  
 
The catalogue is filled with many such documents. The local cache for a given Marconi node is populated with the entries from this cache, to avoid lookups on each request.
 
The catalogue is filled with many such documents. The local cache for a given Marconi node is populated with the entries from this cache, to avoid lookups on each request.
 +
 +
==== Adding Items to the Catalogue ====
 +
 +
Whenever a Marconi queue is created, a hook is called that transfers control the the placement service. It then becomes the placement service's responsibility to:
 +
 +
* Take the given project ID and namespace (queue name)
 +
* Assign a read storage location to it
 +
* Assign a write storage location to it
 +
* Store these locations in the catalogue
 +
* Return these locations to the Marconi worker
 +
 +
==== Removing Items from the Catalogue ====
 +
 +
When a Marconi queue has been deleted, a hook must be called to notify the Placement service. The placement service must then remove the entry from its catalogue and notify all subscribed Marconi listeners.
 +
 +
=== Placement Policy Engine ===
 +
 +
The placement service will use a weighted distribution to determine storage assignment for newly created queues. However, for certain tenants, we might want to assign them to special or dedicated storage.
 +
 +
A weighted distribution might look like:
 +
 +
<pre><nowiki>
 +
{
 +
    "mongo1.storage.com": 30,
 +
    "mongo2.storage.com": 20,
 +
    "mongo3.storage.com": 30,
 +
    "mongo4.storage.com": 75,
 +
    "mongo5.storage.com": 5,
 +
}
 +
</nowiki></pre>
 +
 +
This means that the probability of a queue being stored on mongo1 is (30)/(30 + 20 + 30 + 75 + 5) => 18.75%.
 +
 +
The policy engine is responsible for maintaining this list of special tenants and their preferred mappings. Whenever a request to create a queue is received, Placement iterates through this list, and if the tenant is found, the associated dedicated storage is returned. If no such tenant is found, then the assignment of storage to the queue is determined by Placement.
 +
 +
==== Dynamic Weight Management ====
 +
 +
The operator of the placement service can manually determine the weights of the storage locations to bootstrap the system. However, in the future, it would be preferred to dynamically update these weights based on host parameters such as:
 +
 +
* Storage location CPU load
 +
* Storage location remaining capacity
 +
 +
This adds a level of intelligence to the placement storage layer that makes maintenance a more hands-free experience.
  
 
=== Marconi Cache ===
 
=== Marconi Cache ===
Line 58: Line 139:
  
 
Furthermore, to reduce inconsistency, there's another method to handle cache updates - periodic refresh.
 
Furthermore, to reduce inconsistency, there's another method to handle cache updates - periodic refresh.
 +
 +
=== Marconi Cache: Using Redis ===
 +
 +
If the Marconi catalog and the local caches are implemented using Redis instances, the invalidation procedure can be simplified. Established as a Redis pubsub pattern, the catalog publishes on the cache channel and all listening Marconi workers update their local cache accordingly. Simple messages in the form of:
 +
 +
* <action> <queue> [<destination>]
 +
* Where action is one of: delete, create, migrate
 +
* Where destination is valid only for migrate, and if left unspecified, leaves the decision of destination up the placement service.
  
 
==== Periodic Refresh ====
 
==== Periodic Refresh ====
Line 76: Line 165:
  
 
It seems likely that some form of administrative API will need to be exposed for Marconi to handle communication between the placement service and Marconi needs. It might be necessary for cache handling operations and for triggering migrations.
 
It seems likely that some form of administrative API will need to be exposed for Marconi to handle communication between the placement service and Marconi needs. It might be necessary for cache handling operations and for triggering migrations.
 +
 +
=== Placement Service API ===
 +
 +
== The Future ==
 +
 +
Even though the placement service is being crafted to handle Marconi's needs, the idea behind it is common to other projects that have data to store and need to keep the storage locations dynamic. At some point, it would not be unreasonable to break Placement out. The current plan is to postpose the decision to do so until Marconi has achieved incubation status.

Revision as of 17:34, 5 August 2013

Overview

Marconi depends on storage for maintaining state on messages. However, as it is designed now, this is a scaling bottleneck because at most one storage node can be specified per Marconi instance. This can be partially offset by taking advantage of built-in sharding and replication for storage providers like MongoDB, but many other storage engines do not provide such a system. Furthermore, the migration of queues from one storage node to another is not supported cleanly by most storage providers. More intelligent data migration can be performed by taking into account that Marconi operates at the level of queues.

The placement service aims to address this by providing a means to specify dynamically where storage for Marconi resources should be performed. It is responsible specifically for:

  • Maintaining a catalogue, a set of mapping of (Project + Queue) => storage node locations
  • Handling queue creation transparently
  • Handling migration transparently
  • Handling deletion of queues transparently

Here, transparency must exist at two levels: to the user and to the implementation of the storage driver. Transparency to the user presents itself in the form of availability and use of the Marconi service must not be interrupted when a migration is taking place. Transparency to the storage drivers means that the storage driver is handed a location/connection and only cares about the serialization/deserialization of data to that storage location.

Approach

A placement service consists of the following components:

  • A catalogue of mappings: {queue => [read locations], queue => [write locations], queue => status}
   * where status is one of ACTIVE or MIGRATING
  • A cache maintained in each Marconi instance of the mappings
  • A migration service

Placement API

The placement service must handle:

  • storage assignment
  • cataloging
  • migrations

An API must be exposed to facilitate these operations.

Storage Management

  • Add node (POST /v1/storage: {'location': 'mongo1.storage.com' , 'weight': 100})
  • Delete node (DELETE /v1/storage/mongo1.storage.com)
  • Change weight (PUT /v1/storage/mongo1.storage.com {'weight': 50}
  • Query storage (GET /v1/storage => [...], GET /v1/storage/mongo1.storage.com => {'weight': 100})

Catalogue Management

  • Add entry (POST)
  • Update an entry (PUT)
  • Remove entry (DELETE)

Migration

This means: "move object from location A to location B". In terms of the Placement service, it must:

  • Update the storage write location
  • Update the storage read location
  • Update the entry status ("a" => "m")

Catalogue

The catalog, as described above, maintains these mappings from queues to locations.

Marconi can continue to function in a degraded state if the placement service goes down. The following capabilities will cease to function for Marconi if the placement service goes down:

  • Migration
  • Queue creation

It may be possible to continue performing queue creation and deletion while the placement service is down by defaulting to the last storage location that Marconi held.

The data contained in the catalogue can be regenerated by querying each Marconi endpoint and synchronizing against each cache.

The Marconi catalogue contains a series of structures that look as follows:

{
    "{project}.{queue}" : {
        "r": ["mongo://192.168.1.105:7777", "mongo://192.168.1.106:7777"],
        "w": "mongo://192.168.1.105:7777",
        "s": "a"
    }
}

An entry in the queue is found by concatenating a project with a queue name. "r" is the collection of locations where data for a particular queue can be gathered from. "w" is the location where new data written to this queue is stored. "s" is the state of the queue: active or migrating.

The catalogue is filled with many such documents. The local cache for a given Marconi node is populated with the entries from this cache, to avoid lookups on each request.

Adding Items to the Catalogue

Whenever a Marconi queue is created, a hook is called that transfers control the the placement service. It then becomes the placement service's responsibility to:

  • Take the given project ID and namespace (queue name)
  • Assign a read storage location to it
  • Assign a write storage location to it
  • Store these locations in the catalogue
  • Return these locations to the Marconi worker

Removing Items from the Catalogue

When a Marconi queue has been deleted, a hook must be called to notify the Placement service. The placement service must then remove the entry from its catalogue and notify all subscribed Marconi listeners.

Placement Policy Engine

The placement service will use a weighted distribution to determine storage assignment for newly created queues. However, for certain tenants, we might want to assign them to special or dedicated storage.

A weighted distribution might look like:

{
    "mongo1.storage.com": 30,
    "mongo2.storage.com": 20,
    "mongo3.storage.com": 30,
    "mongo4.storage.com": 75,
    "mongo5.storage.com": 5,
}

This means that the probability of a queue being stored on mongo1 is (30)/(30 + 20 + 30 + 75 + 5) => 18.75%.

The policy engine is responsible for maintaining this list of special tenants and their preferred mappings. Whenever a request to create a queue is received, Placement iterates through this list, and if the tenant is found, the associated dedicated storage is returned. If no such tenant is found, then the assignment of storage to the queue is determined by Placement.

Dynamic Weight Management

The operator of the placement service can manually determine the weights of the storage locations to bootstrap the system. However, in the future, it would be preferred to dynamically update these weights based on host parameters such as:

  • Storage location CPU load
  • Storage location remaining capacity

This adds a level of intelligence to the placement storage layer that makes maintenance a more hands-free experience.

Marconi Cache

This is one of the tricky aspects of the placement service. In the face of data migrations and queue deletions, the cache must be invalidated and updated appropriately.

The current proposed caching scheme is as follows:

  1. Populate the cache when a Marconi instance is brought up
  2. Process a request from the cache whenever possible
  3. If a cache lookup fails, refer to the placement service catalogue - update the cache

Furthermore, to reduce inconsistency, there's another method to handle cache updates - periodic refresh.

Marconi Cache: Using Redis

If the Marconi catalog and the local caches are implemented using Redis instances, the invalidation procedure can be simplified. Established as a Redis pubsub pattern, the catalog publishes on the cache channel and all listening Marconi workers update their local cache accordingly. Simple messages in the form of:

  • <action> <queue> [<destination>]
  • Where action is one of: delete, create, migrate
  • Where destination is valid only for migrate, and if left unspecified, leaves the decision of destination up the placement service.

Periodic Refresh

On a separate Marconi "thread", poll the catalogue service periodically (say, every 10 seconds). This actor is responsible for updating the cache. It queries the catalogue service and looks for changes.

To enable this, migrations are only allowed a granularity of 5 minutes. This helps avoid race conditions on a catalogue resource, since the migration itself triggers a state change from active to migrating for a particular queue.

Push Refresh

This approach does away with time and puts the responsibility of invalidating caches on the placement service. All Marconi nodes must maintain a listen port connected to the placement service, and whenever a migration occurs, Marconi nodes receive updates on queues that are being affected.

Deletions

Deletions take priority over migrations. If a migration is in progress for Q1, and a request to delete Q1 is made, then all messages for Q1 are deleted both from the initial storage location and the destination storage location. The migration is cancelled.

Further Details

It seems likely that some form of administrative API will need to be exposed for Marconi to handle communication between the placement service and Marconi needs. It might be necessary for cache handling operations and for triggering migrations.

Placement Service API

The Future

Even though the placement service is being crafted to handle Marconi's needs, the idea behind it is common to other projects that have data to store and need to keep the storage locations dynamic. At some point, it would not be unreasonable to break Placement out. The current plan is to postpose the decision to do so until Marconi has achieved incubation status.