Jump to: navigation, search


  • Launchpad Entry: CeilometerSpec:move-listener-framework-oslo
  • Created: 05 Feb 2013
  • Contributors: Doug Hellmann


Ceilometer and Quantum use private methods of the RPC connection object to configure themselves to listen to a queue shared among a group of workers. This blueprint discusses how we can make that API public.

Release Note


Ceilometer and Quantum are using private methods of the RPC Connection class, which restricts them to the kombu or qpid implementations. We want to make the API public to facilitate implementations in the other RPC drivers.

User stories

Ceilometer wants to listen to notifications from all of the other OpenStack components in the collector process. A deployer needs to be able to run multiple copies of the collector, to spread the processing load of the incoming notification messages. No message should be processed more than one time, so the existing fanout subscription model does not work. The messages are not wrapped in RPC envelopes, so the ProxyCallback class used by the create_worker() API does not work (and we can't just create a different dispatcher).


  1. The API should not make any assumptions about the format of incoming messages beyond the usual signing and JSON serialization assumptions made for all other messages.
  2. The API should not make any assumptions about which control exchange should be used for listening to incoming messages.
  3. The API should not make any assumptions about the queue name to be used for listening to incoming messages.


A new method on the Connection class will avoid backwards-compatibility issues.

def join_consumer_pool(self, callback, pool_name, topic, exchange_name=cfg.CONF.control_exchange):
    """Register as a member of a group of consumers for a
    given topic from the specified routing exchange.

    Exactly one member of a given pool will receive each message.

    A message will be delivered to multiple pools, if more than one is created.

In the AMQP drivers, the callback will be wrapped with a CallbackWrapper, based on the ProxyCallback class but less directly tied to a specific dispatching behavior:

class CallbackWrapper(object):
    def __init__(self, conf, callback, connection_pool):

    def __call__(self, message_data):
        """Spawn a GreenPool thread to invoke the callback with the message_data."""

    def wait(self):
        """Wait for all callback threads to exit."""

It should be possible to create a base class for both common.ProxyCallback and common.CallbackWrapper to share the wait and pool management behavior.

The ZMQ driver isn't using ProxyCallback right now, but does have a similar set of classes. A version of CallbackWrapper for the ZMQ driver will need to be created.

Dispatching based on the notification type can be implemented by a separate NotificationDispatcher class that also has a call method, so an instance can be passed to join_consumer_pool() just as if it was any other callback. Users of NotificationDispatcher would subclass it to provide methods with names based on the notification name similar to the version currently found in Quantum (https://github.com/openstack/quantum/blob/master/quantum/agent/rpc.py#L90). That implementation isn't completely appropriate for reuse because it is also handling the subscription and consumption logic.


UI Changes


Code Changes

The AMQP implementations (kombu and qpid) will be based on their declare_topic_consumer() methods (this is the private method Ceilometer and Quantum have been using). A queue will be created using the pool name, and attached to the exchange so it receives messages for the specified topic. When each message is received the callback will be invoked with the entire message as the argument.


  1. Ceilometer will be updated to use the new method as it is being developed as a way to test it.
  2. The new method will be added to oslo-incubator.
  3. Ceilometer's copy of the rpc library from oslo will be updated.

Test/Demo Plan

This need not be added or completed until the specification is nearing beta.

Unresolved issues

  1. I am not sure how to implement this for ZMQ.