Jump to: navigation, search

Difference between revisions of "Obsolete:QueueService"

(linking into related projects category)
Line 2: Line 2:
 
= Queue Service =
 
= Queue Service =
  
<<[[TableOfContents]]()>>
+
This document is a proposal and plan for a queue service, it is not an official [[OpenStack]] project yet. Once there is an implementation to evaluate, it will be submitted to the [[OpenStack]] [[Governance/PPB|Project Policy Board]] for approval.
  
 
This project has been started on Launchpad under the name "Burrow". You can find it at: https://launchpad.net/burrow
 
This project has been started on Launchpad under the name "Burrow". You can find it at: https://launchpad.net/burrow
  
This document is a proposal and plan for a queue service, it is not an official [[OpenStack]] project yet. Once there is an implementation to evaluate, it will be submitted to the [[OpenStack]] Project Oversight Committee for approval.
+
Most of the docs that were once here are now located on the new burrow documentation site at: http://burrow.openstack.org/
  
The reason for initiating a new project vs using an existing one is due to simplicity, modularity, and scale. Very few (if any) existing queue systems out there were built with multi-tenant cloud use cases in mind. Very few also have a simple and extensible REST API. There are possible solutions to build an AMQP based service, but AMQP brings complexity and a protocol not optimized for high latency and intermittent connectivity. The benefits and constraints of a distributed, multi-tenant environment (cloud services) requires a different approach than single tenant and/or clustered services.
+
== Possible Features in the Future ==
 
 
== Prototype ==
 
 
 
A prototype of the queue service written in Python is available at: http://bazaar.launchpad.net/~eday/+junk/osq/view/head:/osq.py
 
 
 
If you have bzr installed, you can also: '''bzr branch lp:~eday/+junk/osq'''
 
 
 
The server is fully functional minus persistent options, account verification (all requests are allowed), and some error handling. It is functional, but certainly not optimized. Many operations are O(n) (or worse) when they could be O(1) if using more complex data structures. In other words, this currently won't scale. :)
 
 
 
== Requirements ==
 
 
 
The primary goals of the queue service are:
 
 
 
* Simple - Initially it will expose a simple REST based API to make it easy to access from any language. It should not require much setup, if any, before applications can start pushing messages into it.
 
* Modular API - Initially the focus will be on a simple REST API, but this will not in any way be a first-class API. It should be possible to add other protocols (AMQP, JMS, direct protocol buffers, Gearman, etc) for other use cases. Note that the internal service API will not always provide a 1-1 mapping with the external API, so some features with some protocol modules may be unavailable.
 
* Fast - Since this will act as a building block for other services that may drive heavy throughput, performance will have a focus. This mostly comes down to the implementation language and how clients and workers interact with the server to reduce network chatter.
 
* Multi-tenant - Support multiple accounts for the service, and since this will also be a public service for some deployments, protect against potentially malicious users.
 
* Persistent - Allow messages to optionally be persistent. For protocols that can support it, this can be an optional flag when the message is submitted. The persistent storage should also be modular so various data stores can be tested and to accommodate different deployment options.
 
* Zones and Locality Awareness - As has been discussed on the [[OpenStack]] mailing list, locality in cloud services is an important feature. When dealing with where messages should be processed, there needs to be location awareness to process data where it exists to reduce network overhead and processing time.
 
* No Single Point of Failure - This goes without saying, but the service should not rely on a single component.
 
* Horizontally Scalable for Large Deployments - While it will be appropriate for small deployments, the service must support very large deployments such as a public cloud service.
 
 
 
== Components ==
 
 
 
The queue service will consist of the following components:
 
 
 
* Client - An external application that inserts or updates messages in the queue.
 
* Worker - An external application that reads, updates, or deletes messages in the queue. Workers need to be idempotent, as the queue service may deliver the same message more than once in some failover situations. Note that the same application can be both a client and worker, the differentiation is only made based on typical behaviors, not on what they are capable of doing.
 
 
 
=== Services ===
 
 
 
* Server - This is where messages are routed through. Clients will insert messages and workers will receive messages. Multiple servers may be run for high availability, and servers do not need to be aware of one another.
 
* Proxy - This is an optional service that sits between the server and clients/workers. This allows clients and workers to connect to a single address and proxy to multiple servers. This is only required for extremely large deployments (such as a public cloud service) and will not be implemented until at least the second major milestone.
 
 
 
=== Data ===
 
 
 
* Accounts - This represents a user, application, or some other account context. Every message is associated with an account, and accounts can have ACLs to control the CRUD operations for other accounts and queues. ACL entries may have some simple pattern matching capabilities.
 
* Queue - A queue is simply a namespace, it doesn't have metadata associated with it. Queue names are unique within an account context. Queues cannot be created or destroyed, they automatically appear to clients and workers when a message exists for the queue name, and is automatically removed when all messages for a given queue name are deleted.
 
* Messages - A message consists of a unique ID, metadata, and a body. The unique ID is composed of the account, queue name, and a message ID. A server will only contain a single message for a given unique ID. If two messages are inserted with the same unique ID, the last one overwrites the first.
 
 
 
====  Message Metadata ====
 
 
 
* ttl=SECONDS - How long the message will exist in the queue.
 
* hide=SECONDS - How long the message should be hidden from list requests. This allows for delayed insert of messages and "in progress" time when a worker is processing the message but is not ready to delete it.
 
* persist=none|semi|full - Set delivery guarantee. This can be quantified in an SLA per deployment depending on the backing data stores used. This allows for fast/unreliable messages (memory only queues), semi-reliable messages (backing store without replication and/or sync to disk) and slow/reliable messages (synced to disk and/or replicated).
 
 
 
The persist option is only available on insert, the other options are available on insert and update. All metadata will have tunable default values for deployments. In the future there may also be user-defined metadata added, but this will be considered later.
 
 
 
== Operations for HTTP API ==
 
 
 
The HTTP API will be versioned by using the first part of the path to allow changes to be introduced while maintain backwards compatibility.
 
 
 
* GET / - List all supported versions.
 
* GET /version - List all accounts that have messages in them.
 
* GET /version/account - List all queues that have message in them.
 
* GET /version/account/queue - List all messages in the queue.
 
* GET /version/account/queue/id - List the message with the given id.
 
* PUT /version/account/queue - Insert a message and automatically generate a unique message id.
 
* PUT /version/account/queue/id - Insert a message with the given id, overwriting a previous message with the id if one existed.
 
* POST /version/account/queue - Update the metadata for all messages in the queue.
 
* POST /version/account/queue/id - Update the metadata for the message with the given id.
 
* DELETE /version/account - Remove all messages in the account.
 
* DELETE /version/account/queue - Remove all messages in the queue.
 
* DELETE /version/account/queue/id - Remove the message with the given id.
 
 
 
The GET, POST, and DELETE methods for queues have the ability to return messages that match using parameters listed below. These methods, as well as methods on messages, can also specify what details of the messages to return.
 
 
 
The GET method on root, version, or account returns a list of the appropriate resource. The DELETE method on an account returns nothing.
 
 
 
The PUT method without a message id will either not be available, or available only to proxy servers. The queue servers should not generate one since it doesn't know if the client or proxy hashing algorithm will map back to the given server.
 
 
 
=== Optional GET, POST, and DELETE parameters ===
 
 
 
* limit=COUNT - Limit the number of matched messages. This allows a worker to grab as few or as many messages it can handle at once. For example, a worker resizing images may only grab one at a time, but a worker processing log messages may grab many for efficient batch processing.
 
* marker=ID - Only match IDs after the given ID. This allows for multi-cast messages by always having a 'hide' value of 0 and not deleting messages (let the TTL delete them automatically).
 
* hidden=true|false - Whether to match messages that are currently hidden. The default should be false for queues, but true for message-specific operations.
 
* wait=SECONDS - Wait the given time period for a new message if no messages currently match. This allows workers to long-poll if needed.
 
* detail=none|id|metadata|all - What details to return in the response body.
 
 
 
== Behavior and Constraints ==
 
 
 
The default queue behavior is FIFO, although LIFO should be a trivial addition (this will be revisited later). Message insert and retrieval order is only guaranteed for a single queue server. When used along with a proxy, or when a client uses multiple queue servers, it is possible that newer messages from one queue server will be returned before older messages in another. The proxy server will make a best effort to drain the queue servers evenly, but strict message ordering is not guaranteed. Applications that require strict ordering will need to use some external mechanism to guarantee this.
 
 
 
In certain failure scenarios such as a queue server crash, it is possible, depending on deployment configuration, for messages to either be lost or duplicated. Lost messages should only happen if they were inserted with no persistence (such as an in-memory queue or some other non-persistent backing store). It is up to the deployment to offer guarantees according to what message persistence modules they choose to offer. For example, a queue server backed by MySQL will only be as durable as the MySQL configuration, which can vary from in-memory only to extremely durable (commit to disk and replicated).
 
 
 
Duplicate messages can happen when a worker pulls a message to work on, the server crashes, the worker finishes, and the message delete call fails. When the server comes back up, the message will appear in the queue again (assuming it was in a persistent message store). Applications can handle this in a variety of ways, from simply running the job twice, to rolling back changes on the delete failure, to tracking unique message IDs to ensure they are processed only once.
 
 
 
A client used with a single proxy server (single IP, may be multiple servers behind a HA load balancer) will only need to concern itself with one server and send all messages to it. Clients have the option to use a list of servers in two ways. For example, in the "Simple HA" example deployment below, the client could either hash on the message ID and send to the appropriate server in its list (for spreading the load) or failover to another server if the first server is down (for simple HA). It is possible for clients to do both with the "Public Cloud Service" deployment example, where instead of using multiple queue servers, it uses multiple proxy servers. For a multi-zone deployment, this would allow a client to either spread the load over all zones or failover to a secondary zone should the first go down.
 
 
 
== Other Features ==
 
  
 
* Support user-defined metadata.
 
* Support user-defined metadata.
 
* Allow secondary indexes on user-defined metadata.
 
* Allow secondary indexes on user-defined metadata.
 
* Support in-process router "workers" for advanced routing and replication configurations. This will require API additions to manage the routing rules.
 
* Support in-process router "workers" for advanced routing and replication configurations. This will require API additions to manage the routing rules.
 
== Milestones and Feature Priorities ==
 
 
=== Cactus Release (April) ===
 
 
* Queue "kernel" module with basic functionality.
 
* Basic HTTP API module.
 
* Account API with a simple no-op module.
 
 
=== Diablo Release (July) ===
 
 
* Extended HTTP API module (support for advanced metadata if not done already).
 
* Real account modules backed by LDAP and/or SQL database.
 
* Persistent message storage API and simple module.
 
 
=== Future Releases ===
 
 
* More persistent message storage modules.
 
* Proxy server module.
 
* More API modules (AMQP, Gearman, ...).
 
  
 
== Examples ==
 
== Examples ==
Line 170: Line 60:
  
  
== Deployment Configurations ==
+
== Example Deployment Configurations ==
  
 
Configuration of the queue service will be flexible and this section describes three sample deployments. Each of the queue servers (green boxes) can optionally have persistence configured for them. Note that queue servers should not share the same backend storage unless the backend-storage is suitable for the HA needs of the deployment. The queue servers will not know of or coordinate with one another in any way, the clients and workers (or proxy, in the last example) are responsible for balancing the load between available queue servers and failing over to another server if one goes down.
 
Configuration of the queue service will be flexible and this section describes three sample deployments. Each of the queue servers (green boxes) can optionally have persistence configured for them. Note that queue servers should not share the same backend storage unless the backend-storage is suitable for the HA needs of the deployment. The queue servers will not know of or coordinate with one another in any way, the clients and workers (or proxy, in the last example) are responsible for balancing the load between available queue servers and failing over to another server if one goes down.

Revision as of 18:34, 12 May 2011

Queue Service

This document is a proposal and plan for a queue service, it is not an official OpenStack project yet. Once there is an implementation to evaluate, it will be submitted to the OpenStack Project Policy Board for approval.

This project has been started on Launchpad under the name "Burrow". You can find it at: https://launchpad.net/burrow

Most of the docs that were once here are now located on the new burrow documentation site at: http://burrow.openstack.org/

Possible Features in the Future

  • Support user-defined metadata.
  • Allow secondary indexes on user-defined metadata.
  • Support in-process router "workers" for advanced routing and replication configurations. This will require API additions to manage the routing rules.

Examples

Basic Asynchronous Queue

Multiple workers long-poll for messages until a client inserts one. Both workers tell the server to hide the message once it is read, so only one worker will be able to see the message. The POST request from a worker is an atomic get/set operation.


Worker1: POST /account/queue?wait=60&hide=60&detail=all (long-polling worker, request blocks until a message is ready)
Worker2: POST /account/queue?wait=60&hide=60&detail=all (long-polling worker, request blocks until a message is ready)
Client: PUT /account/queue (message inserted, returns unique id that was created)
Worker1: Return from the blocking POST request with the new message and process it
Worker1: DELETE /account/queue/id


Fast Asynchronous Queue

In this example message loss is acceptable and workers have the ability to do batch-processing (process multiple messages at once). Workers grab up to 100 messages at a time and tell the server to remove the messages once they are returned. If a worker crashes after the server returns them but before they were successfully processed, they are lost. The DELETE request from a worker is an atomic get/delete operation.


Worker1: DELETE /account/queue?limit=100&wait=60&detail=all (long-polling worker, request blocks until a message is ready)
Worker2: DELETE /account/queue?limit=100&wait=60&detail=all (long-polling worker, request blocks until a message is ready)
Client: PUT /account/queue (message inserted, returns unique id that was created)
...
Worker1: Return from the blocking DELETE request with all new messages and process them


Multi-cast Event Notifications

This allows multiple workers to read the same message since the message is not hidden or removed by any worker. The server will automatically remove the message once the message TTL has expired.


Worker1: GET /account/queue?wait=60
Worker2: GET /account/queue?wait=60
Client: Put /account/queue/id1?ttl=60
Worker1: Return from blocking GET request with message id1
Worker2: Return from blocking GET request with message id1
Worker1: GET /account/queue?wait=60&marker=id1
Worker2: GET /account/queue?wait=60&marker=id1


Example Deployment Configurations

Configuration of the queue service will be flexible and this section describes three sample deployments. Each of the queue servers (green boxes) can optionally have persistence configured for them. Note that queue servers should not share the same backend storage unless the backend-storage is suitable for the HA needs of the deployment. The queue servers will not know of or coordinate with one another in any way, the clients and workers (or proxy, in the last example) are responsible for balancing the load between available queue servers and failing over to another server if one goes down.

Developer

In this deployment, a single queue server is run, and all clients and workers connect to it.

File:QueueService$queue service dev.png

Simple HA

In this deployment, multiple queue servers are run and all clients and workers are given the list of IP addresses for the available queue workers. Clients should either connect to the first available server, or if it wants to distribute load amongst all three, should use some algorithm depending on message. If all messages are unique, a simple round-robin distribution may be sufficient. For messages with client-side IDs that could possibly have duplicates, the client should use a hashing algorithm on the ID to ensure the same ID hits the same server. This allows the queue services to coalesce all messages with the same ID. Should a queue server go down, the clients can simply re-balance to the new server set. Workers should poll or remain connected to all queue servers to ensure it can pull a message no mater where it is inserted.

File:QueueService$queue service ha.png

Public Cloud Service

In this deployment, proxy servers are placed in front of each cluster of queue servers. The proxy servers manage spreading the load across the queue cluster instead of relying on the clients and workers to manage multiple connections. This is only suitable when your proxy servers are configured in a redundantly (such as when using a HA load balancer). For a given account ID, all proxy servers in a zone should hash to the same subset of queue workers (with a default max of three), and use that set to spread load across. This is similar to how Swift spreads objects based on the placement in the hash ring. Once the account ID determines the set of queue servers to use, the queue name and message ID (other components of the unique message ID) will determine which queue server in the set to use. The algorithm used in the proxy should be modular, so you can easily alter how many queue servers to use for an account, and how to distribute to them within that set.

File:QueueService$queue service pub.png

For example, if a client PUTs a message with ID /v1.0/account_1/queue_A/msg_123, the proxy server will parse out the "account_1" component and use that in the hashing algorithm to get a set of queue servers (lets say it returns the set qs1, qs4, qs5). With this set, the proxy then hashes the rest of the ID "queue_A/msg_123" to determine which queue server to proxy to (lets say it maps to qs4). If a message comes in with the exact same ID, the same algorithm is used to proxy it to the same queue server, possibly allowing the queue server to coalesces the message so it is processed by a worker only once (eliminating the thundering herd problem). If a queue server in the returned set should fail, it can either run with two servers or choose a third server until the original node comes back up.

When the proxy is handling worker requests it will use the same hashing algorithms. When a worker GETs a queue name to read messages, the account portion is parsed and a connection is made to all queue servers. It will then aggregate messages from all queue servers handling that account into one view for the worker to consume. The proxy and queue servers may need to use a more efficient multiplexing protocol that can keep state for multiple accounts and requests rather than simple REST based calls to keep the number of connections reasonable.