Jump to: navigation, search


< Zaqar
Revision as of 14:45, 22 April 2013 by Kgriffs (talk | contribs) (4. Solutions)

Message Pagination

Blueprint: message-pagination

1. Markers must be decoupled from IDs to allow for future optimizations in storage drivers.

 a. Return as last item in the iterator
 b. Return in body of StopIteration exception
 c. Return marker as part of each doc
           "id": "foo",
           "marker": "foo",
           "ttl": 400,
           "body": {}

(c) is the current recomendation, because it is both efficient and conforms to the Principle of Least Surprise. It is efficient because messages can be streamed and the transport can pull the marker off the last message, meaning the driver does not have to do an additional query to create a (marker, iter) tuple. Furthermore, if marker and id are the same string, it does not cost much of anything to duplicate those fields.

2. Requirements

  • Duplicates are not allowed to be returned between pages
  • No message may be skipped/dropped/lost

3. Implications

In the case of a sharded/distributed data store, using a timestamp-based marker can cause clients to receive duplicate messages, or even to miss messages.

Consider the following scenario, assuming a marker/ID entirely made up of a timestamp, with a granularity of 1 second (truncated from clock ticks), and assuming no clock skew between storage nodes:

T+0100 ms - Client A posts message M1 and M1 is persisted to shard S1 with ID = T+0000
T+0200 ms - Client X lists messages, receives M1, and marker "T+0000"
T+0300 ms - Client B posts message M2 and M2 is persisted to shard S2 with ID = T+0000
T+0350 ms - Client X lists messages using marker "T+0000", receives M1 (duplicate) and M2

In this case, no messages are missed by Client X, but it does receive a duplicate (M1).

Now, consider what happens in the presence of clock skew. Suppose storage node S1 is running fast, e.g., 1 second ahead of S2. The timestamp for each sequence item is taken from S2's clock.

T+0100 ms - Client A posts message M1 and M1 is persisted to shard S1 with ID = T+1000
T+0200 ms - Client X lists messages, receives M1, and marker "T+1000"
T+0300 ms - Client B posts message M2 and M2 is persisted to shard S2 with ID = T+0000
T+0350 ms - Client X lists messages using marker "T+1000", and receives NO MESSAGES.

In this scenario, Client X will never receive M2 because markers are by necessity monotonic, and T+0000 > T+1000.

So, why not just use a clock with better resolution? The trouble is, you simply can't achieve a fine-enough resolution to avoid collisions in a distributed system. Clock skew and network latency are both inevitable to some degree, effectively limiting the resolution of the system as a whole (even if individual nodes are incredibly precise).

OK, what about adding counters to each node? It is common to generate object IDs in distributed systems by combining a timestamp, plus a local, looping counter and some information about the node that is creating the ID (e.g., MAC address, process ID, etc.) Using this strategy virtually guarantees object IDs will be unique, but doesn't necessarily reduce the risk of object IDs being assigned out-of-order.

But what about using a side-counter? This can still lead to missed messages. Consider:

T+0100 ms - Producer A posts message M1
T+0100 ms - Producer B posts message M2
T+0101 ms - Server A increments side counter and gets ID = 7
T+0101 ms - Server B increments side counter and gets ID = 8
T+0101 ms - Server B inserts M2 with ID = 8
T+0100 ms - Observer X lists messages with marker = 0, and gets new marker = 8
T+0101 ms - Server A inserts M1 with ID = 7
T+0100 ms - Observer X lists messages with marker = 8 and never sees M1!

4. Solutions

a. You can purposely make your clock resolution coarse (say, taking the floor of the timestamp within a 5 seconds period). In this way, you avoid missed messages, but greatly increase the chance of queries returning duplicate messages based on a given marker. The client or storage driver will either have to cache messages received for the past 5 seconds in order to filter out duplicates, or the server will have to do the same on behalf of the client.

b. You can create one or more oracles that generate, for each scope in which the ID must be unique, a series of monotonic IDs that are also used as markers. Tenants or even collection resources under each tenant may each be assigned different oracles, mitigating the bottleneck of a single ID generator. This oracle must be implemented in the form of a serializing buffer, in order to avoid the race condition in the app server between getting the next ID and inserting the message document. Alternatively, the storage driver can conditionally insert messages only if the expected max ID has not changes, retrying if it has (with some kind of jitter algorithm).

c. Markers can be decoupled from IDs, such that a marker is a vector of IDs, each ID being associated with a particular storage shard. In this solution, individual shards may use local ID counters with the only caveat that they be monotonic. The advantage of this approach is that counters do not need to be synced between machines. On the other hand, using a vector marker requires sophisticated storage partitioning and a dynamic schema designed to allow collection resources to be addressable across as few, or as many storage nodes as needed to elastically balance storage resources. Furthermore, vector IDs DOES NOT guarantee FIFO!