Jump to: navigation, search


The Oslo messaging library provides two independent APIs:

  1. oslo.messaging.rpc for implementing client-server remote procedure calls
  2. oslo.messaging.notify for emitting and handling event notifications

They are part of the same library for mostly historical reasons - the most common transport mechanisms for oslo.messaging.notify are the transports used by oslo.messaging.rpc.

NOTE: the below article is a design document which pre-dates the actual implementation of the library. The implementation closely mirrors the design and so the article is still useful background, but please see the the full API documentation for more details.


As OpenStack evolves, messaging is becoming increasingly important to the project and we continue to design our projects' architectures around the use of RPC in particular.

However, the RPC API is currently heavily based on AMQP concepts and imposes very little in the way of recommended usage. This is quite frustrating for those designing the relationship between components within a project but who don't necessarily wish to understand AMQP in any great detail. Poorly designed intra-service interfaces have the potential to be a real issue for us because these interfaces must evolve in backwards compatible ways or risk causing serious pain for operators during upgrades.

Also, while the two most used implementations of the API - kombu/rabbitmq and qpid - are both AMQP implementations, we expect the zmq implementation to be joined over time by other non-AMQP implementations. Each time a project uses the RPC API in an AMQP specific way, the non-AMQP implementations will be further set back. We need a clean abstraction so that each new messaging pattern a service wishes to adopt must be modelled in the abstraction in a way that non-AMQP implementations can support.

Finally, we wish to publish these APIs as a proper standalone library but do not wish to lock ourselves into retaining backwards compatibility for the current API.

This proposal won't initially cover all of the use cases required for e.g. ceilometer and cells, but it will do so eventually. We just need to initially focus on getting the design right for some of the more common use cases.

Current State

The API design has moved on somewhat from what is described below and is implemented in the oslo.messaging repo. See also the API docs generated from inline docstrings.

At this point, the API design is pretty mature but we do not yet have any transport drivers implemented.


The RPC API defines semantics for addressing, procedure calls, broadcast calls, return values and error handling.

There are multiple backend transport drivers which implement the API semantics using different messaging systems - e.g. RabbitMQ, Qpid, ZeroMQ. While both sides of a connection must use the same transport driver configured in the same way, the API avoids exposing details of transports so that code written using one transport should work with any other transport.

The exception to this principle of abstraction is that the API exposes transport-specific configuration for how to connect to the messaging system itself.


  • Server: servers makes an RPC interfaces available to clients
  • Client: clients invoke methods on servers
  • Exchange: containers within which each project's topics are scoped
  • Topic: a topic is a identifier for an RPC interface; servers listen for method invocations on a topic; clients invoke methods on a topic
  • Namespace: servers can expose multiple sets of methods on the one topic, where each set of methods is scoped under a namespace. There is a default null namespace.
  • Method: a method has a name and a number of named (i.e. "keyword") arguments
  • API version: each namespace has a version number associated with it which is increased each time the interface is changed. Backwards compatible changes are indicated by an increased minor number and incompatible changes are indicated by an increased major number. Servers can support multiple incompatible versions at once. Clients can request the minimum version they require when invoking a method.
  • Transport: an underlying messaging system which delivers the RPC requests to servers and return the replies to clients without the clients and servers needing to know any of the specifics of the messaging system

Use Cases

Note: if a use case appears to be missing in the list below, consider whether that use case is actually covered by the oslo.messaging.notify API. If so, then that use case must be supported internally by oslo.messaging, but not necessarily supported by the public facing oslo.messaging.rpc API. Yes, that's confusing.

Invoke Method on One of Multiple Servers

This is where we have multiple servers listening for method invocations on a topic within an exchange, where both the exchange and topic names are well known to clients. When a client invokes a method on this topic, the invocation gets dispatched to one of the listening servers in round robin fashion.

Examples of this use case:

  • nova-api invokes the 'run_instance' method on the 'scheduler' topic in the 'nova' exchange and one of the nova-scheduler services handles it
  • nova-compute invokes the 'instance_update' method on the 'conductor' topic in the 'nova' exchange and one of the nova-conductor services handles it
  • nova-api invokes the 'add_dns_entry' method on the 'network' topic in the 'nova' exchange and one of the nova-network services handles it
  • cinder-api invokes the 'run_instance' method on the 'scheduler'[1] topic in the 'cinder' exchange and one of the cinder-scheduler services handles it
  • quantum-linuxbridge-agent invokes the 'update_device_up' method on the 'plugin'[2] topic in the 'quantum' exchange and one of the quamtum-server services handles it in the linuxbridge plugin
  • quantum-dhcp-agent invokes the 'get_dhcp_port' method on the 'q-plugin' topic in the 'quantum' exchange and one of the quantum-server services handles it
  • quantum-lbaas-agent invokes the 'plug_vip_port' method on the 'q-loadbalancer-plugin' topic in the 'quantum' exchange and one of the quantum-server services handles it
  • heat-api invokes the 'identify_stack' method on the 'engine' topic in the 'heat' exchange and the heat-engine service handles it
[1] - cinder currently uses the topic name 'cinder-scheduler' because of bug #1173552
[2] - quantum currently uses the topic name 'q-plugin', but there's probably no reason to prefix it like that
[3] - this will change when Heat gains support for multiple heat-engine servers

Invoke Method on a Specific Server

This is where we have multiple servers listening for method invocations on a topic within an exchange, where both the exchange and topic names are well known to clients. However, in this case, the client wishes to invoke a method on a specific one of these servers.

Examples of this use case:

  • nova-scheduler chooses 'foobar' as the host to run a new instance so it invokes the 'run_instance' method on the 'foobar' server listening on the 'compute' topic in the 'nova' exchange and the nova-compute service running on 'foobar' launches the instance
  • nova-api receives a request to terminate an instance running on the host 'foobar' so it invokes the 'terminate_instance' method on the 'foobar' server listening on the 'compute' topic in the 'nova' exchange and the nova-compute service running 'foobar' on handles it
  • the nova-dhcpbridge dnsmasq helper runs on host 'foobar' when a DHCP lease expires and it invokes the 'release_fixed_ip' on the 'foobar' server listening on the 'network' topic in the 'nova' exchange and the nova-network service on same host handles it
  • cinder-scheduler chooses 'foobar' as the volume node on which to create a new volume so it invokes the 'create_volume' method on the 'foobar' server listening on the 'volume'[1] topic in the 'cinder' exchange and the cinder-volume service running on 'foobar' handles it
  • quantum-server invokes the 'port_delete_end' on the 'foobar' server listening on the 'dhcp_agent' topic in the 'quantum' exchange and the quantum-dhcp-agent running on 'foobar' handles it
[1] - again, cinder currently uses the topic name 'cinder-volume' because of bug #1173552

Invoke Method on all of Multiple Servers

This is where we have multiple servers listening for method invocations on a topic within an exchange, where both the exchange and topic names are well known to clients. In this case, the client wishes to invoke a method on all of these servers.

  • nova-compute periodically invokes the 'update_service_capabilities' method in fanout mode on the 'scheduler' topic in the 'nova' exchange and all nova-scheduler services handle it
  • at startup, nova-scheduler invokes the 'publish_service_capabilities' method in fanout mode on the 'compute' topic in the 'nova' exchange and all nova-compute services handle it
  • when DNS entries need updating, nova-network invokes the 'update_dns' method in fanout mode on the 'network' topic in the 'nova' exchange and all nova-network services handle it
  • quantum-server invokes the 'network_delete' method in fanout mode on the 'q-agent-notifier-network-delete' topic in the 'quantum' exchange and all quantum-linuxbridge-agent services handle it

Types of Method Invocations

There are two ways that a method can be invoked:

  1. cast - the method is invoked asynchronously and no result is returned to the caller
  2. call - the method is invoked synchronously and a result is returned to the caller

Note that the caller need not be aware of whether a method will be invoked using a cast or a call.

Note also there's no mention of fanout casts here - "fanout" is a property of a method invocation's target rather than the type of invocation itself, however it's not possible to do a fanout call.

Finally, note that we previously had a concept of multicall where the method is invoked synchronously and multiple results are returned to the caller, each result returned as soon as it's ready. However, given that we don't have any current use cases for this, we're dropping support for it.


The API is abstract enough that it can be implemented by multiple messaging systems. Currently, we have two AMQP based implementations (kombu/rabbitmq and qpid) and a ZeroMQ implementation. These messaging systems can be thought of as transports.

The API does not expose any transport specific semantics, except for how transport specific configuration which describes how a client or server should "connect to" the transport.

Let's look at the configuration parameters for each transport we currently support.


The configuration parameters primarily describe how to connect to the RabbitMQ broker:

  • host
  • port
  • userid
  • password
  • virtual_host

We have some parameters which control our retry behaviour if a connection to a broker fails:

  • retry_interval
  • retry_backoff
  • max_retries

We have some SSL specific configuration parameters:

  • use_ssl
  • ssl_version
  • keyfile
  • certfile
  • ca_certs

We have some highly AMQP specific parameters which allow for configuring durable and/or mirrored AMQP queues:

  • durable_queues
  • ha_queues

The final interesting wrinkle is that we support configuring multiple RabbitMQ broker hostnames for the case where you're using a broker of clusters:

  • hosts = [$host]


Qpid has a very similar set of transport configuration parameters.

Basic broker connection information:

  • hostname
  • port
  • username
  • password
  • protocol (tcp or ssl)

More obscure configuration parameters:

  • sasl_mechanisms
  • heartbeat
  • nodelay

And, again, the support for multiple hosts in a cluster:

  • hosts = [$host]


Describing how we should listen for zmq connections:

  • bind_address
  • host
  • port
  • ipc_dir


  • contexts
  • topic_backlog

Matchmaker configuration:

  • matchmaker (i.e. which driver to use)

Specific to ringfile matchmaker:

  • ringfile (a path to a json file)

Specific to the redis matchmaker:

  • host
  • port
  • password
  • heartbeat_freq
  • heartbeat_ttl

Transport URLs

Interestingly, kombu supports supplying transport configuration parameters in the form of a URL:


It also uses query parameters for some options e.g.


We will use a similar model for describing transport configuration. In the degenerate case where none of the standard URL attributes have any meaning for a particular transport, we can do:


Since some transports will support a clustered scenario where the transport must be confgured with multiple distinct (but equivalent) configurations, so we will support configuring a transport with a set of URLs.


Methods are invoked on targets. Targets are described by the following parameters:

  • exchange (defaults to CONF.control_exchange)
  • topic
  • server (optional)
  • fanout (defaults to False)
  • namespace (optional)
  • API version (optional)

The use of asterisks in topic names has special meaning in AMQP where a server can bind to e.g. 'topic.subtopic.*' and receive messages sent to the 'topic.subtopic.foo' topic. It is this AMQP specific wildcard based topic matching on the server which we do not appear to use in OpenStack, so we should not allow such topic names on the server side in our API.

Note: perhaps it's not ridiculous to be able to include some of the target parameters in the transport URL (exchange? topic?) for the case where you want to have a "how to contact this other service" configuration parameter like:

glance_transport = kombu://me:secret@foobar:3232//glance/notifications

Client Side API

We use a client-side proxy class which encapsulates the sending of method invocations to a particular server-side interface

# in nova
class BaseAPIClient(messaging.RPCClient):

    def __init__(self, transport):
        target = rpc.Target(topic='blaa', version='1.0', namespace='baseapi')
        super(BaseAPIClient, self).__init__(transport, target)

    def ping(self, context, arg, timeout=None):
        cctxt = self.prepare(timeout=timeout)
        return cctxt.call('ping', context, arg)

    def get_backdoor_port(self, context, host):
        cctxt = self.prepare(version='1.1', server=host)
        return self.call('get_backdoor_port', context)
# in oslo.messaging.rpc.client
class _CallContext(object):

    def __init__(self, transport, target, timeout=None, ...):
        self.transport = transport
        self.target = target
        self.timeout = timeout

    def call(self, method, ...):
        print('calling %(topic)s.%(method)s version %(version)s' %
              dict(topic=self.target.topic, method=method,
        msg = self._make_message(method, ...)
        return self.transport.send(self.target, msg, wait_for_reply=True)

class RPCClient(object):

    def __init__(self, transport, target, ...):
        self.transport = transport
        self.target = target
        super(RPCClient, self).__init__()

    def prepare(self, server=None, version=None,
                timeout=None, ...):
        target = self.target(server=server, version=version)
        return _CallContext(self.transport, target, timeout, ...)

    def call(self, method, **kwargs):
        return self.prepare().call(method, **kwargs)

    def cast(self, method, **kwargs):
        self.prepare().cast(method, **kwargs)

The main thing we're avoiding here is the caller ever seeing the message payload which will be sent on the wire. It should be completely up to the transport driver to decide the on-the-wire message format and the API user should never be thinking in terms of the messages themselves. If we can limit the call() arguments to the method and the method arguments, then we don't need a make_msg() method like we have now.

Server Side API

On the server-side, we have a server object which encapsulates listening on the (exchange, topic, host) portion of a target and allow registering API endpoints for each (namespace, version) exposed.

Note that it is transparent to the server whether a client invoked a method by narrowing the target to a specific host or by widening it to fanout mode.

server = eventlet.EventletRPCServer(transport, target, [manager, baseapi])

The base server class would look like:

# in oslo.messaging
class MessageHandlingServer(object):

    def __init__(self, transport, target, dispatcher, executor_cls):
        self.transport = transport
        self.target = target
        self.dispatcher = dispatcher

        self._executor_cls = executor_cls
        self._executor = None

        super(MessageHandlingServer, self).__init__()

    def start(self):
        if self._executor is not None:
        listener = self.transport._listen(self.target)
        self._executor = self._executor_cls(..., listener, self.dispatcher)

    def stop(self):
        if self._executor is not None:

    def wait(self):
        if self._executor is not None:
        self._executor = None

# in oslo.messaging.rpc
class RPCServer(server.MessageHandlingServer):

    def __init__(self, transport, target, endpoints, executor_cls):
	super(RPCServer, self).__init__(transport,

class BlockingRPCServer(RPCServer):

    def __init__(self, transport, target, endpoints):
	executor_cls = impl_blocking.BlockingExecutor
	super(BlockingRPCServer, self).__init__(transport,

# in oslo.eventlet
class EventletRPCServer(server.RPCServer):

    def __init__(self, transport, target, endpoints):
        executor_cls = impl_eventlet.EventletExecutor
	super(EventletRPCServer, self).__init__(transport,

The idea here is that the server is implemented using two internal concepts - dispatchers and executors. The dispatcher looks at the incoming message payload and invokes the appropriate method. The executor represents the strategy for polling the transport for incoming messages and passing them to the dispatcher. These two abstractions allow us to use the same server implementation with multiple dispatchers (e.g. for rpc and notifications) and multiple executors (e.g. blocking and eventlet).

What's particularly important here is that we're not encoding a dependency on eventlet in the transport drivers, leaving us room to switch to something else in the future.

Method invocations get dispatched to API endpoints which encapsulate a remotely invocable interface (i.e. set of methods) under a namespace and associated with a version:

# in nova
class BaseAPI(object):

    target = rpc.Target(namespace='baseapi', version='1.1')

    def __init__(self, server, service_name, backdoor_port):
        self.conf = server.conf
        self.server = server
        self.service_name = service_name
        self.backdoor_port = backdoor_port

      def ping(self, context, arg):
          return dict(service=self.service_name, arg=arg)

      def get_backdoor_port(self, context):
          return self.backdoor_port

Note: the suggestion of an (optional?) rpc.expose decorator for explicitly marking methods as available via the RPC interface. We have plenty of examples where we'd benefit from more clarity around which methods we intend to be remotely invokable.

Note also: if the API endpoint must be able to reference the server - e.g. to refer to the ConfigOpts object representing the configuration file we're using or, in the case of a blocking server, stopping the server from a callback - then you'd pass the server object to the API endpoint constructor as above.

API Version Negotiation

Describe how the client/server API is versioned and if the server does not implement the version required by the client, then UnsupportedRpcEnvelopeVersion is raised.

Transport Driver API

We should move away from having a global transport driver object in the messaging library and instead require API users to construct a driver and pass it to the library where needed. API users can choose to have their own global driver object for convenience.

Transport driver classes should be registered as entry points:

# in oslo.messaging.drivers
NAMESPACE = 'oslo.messaging.drivers'

    'rabbit = oslo.messaging._drivers.impl_rabbit:RabbitDriver',
    'qpid = oslo.messaging._drivers.impl_qpid:QpidDriver',
    'zmq = oslo.messaging._drivers.impl_zmq:ZmqDriver',

    # To avoid confusion
    'kombu = oslo.messaging._drivers.impl_rabbit:RabbitDriver',

    # For backwards compat
    'openstack.common.rpc.impl_kombu ='
    ' oslo.messaging._drivers.impl_rabbit:RabbitDriver',
    'openstack.common.rpc.impl_qpid ='
    ' oslo.messaging._drivers.impl_qpid:QpidDriver',
    'openstack.common.rpc.impl_zmq ='
    ' oslo.messaging._drivers.impl_zmq:ZmqDriver',

# in setup.py
entry_points = {

And the class looked up with the 'rpc_backend' configuration option:

  from stevedore import driver

  def get_transport(conf, url=None):
      if url is not None:
          rpc_backend = urlparse.urlparse(url).scheme
          rpc_backend = conf.rpc_backend
      kwargs = ...
      mgr = driver.DriverManager(drivers.NAMESPACE,
      return Transport(mgr.driver)

The driver returned would be an implementation of the BaseDriver abstract base class:

class BaseDriver(object):

    __metaclass__ = abc.ABCMeta

    def __init__(self, conf, url=None, default_exchange=None):
	self.conf = conf
        self._url = url
	self._default_exchange = default_exchange

    def send(self, target, message, wait_for_reply=None, timeout=None):

    def listen(self, target):


Whether the context should enjoy its current status in the API was a subject of some debate during the Havana design summit session. I've no opinion yet :)

Error Handling

Describe what happens from an API user's point of view what happens when the server side raises an exception

Details to consider:

  • allowed_rpc_exception_modules configuration
  • falling back to RemoteError if we can't deserialize
  • bizarreness with setting the type to Remote_$type
  • @client_exceptions() decorator and logging
  • timeouts - e.g. what raises the rpc_common.Timeout exception


The configuration options registered by the library are part of the API. For example, if we decided to rename 'control_exchange' to 'topic_container' then any code which references CONF.control_exchange would break. We can probably safely say that config options specific to a transport driver are private and say that we don't support API users to depend on those? Or perhaps we just document which options are part of the API and warn people not to rely on the others?

We have some transport-agnostic config options:

  • rpc_backend - choose the transport driver
  • control_exchange - allows e.g. having two heat deployments using the same messaging broker, simply by doing control_exchange = 'heat1'

Then some tuning parameters:

  • rpc_thread_pool_size
  • rpc_conn_pool_size
  • rpc_response_timeout
  • rpc_cast_timeout

Then we have a config option which should simply be part of the API, since users should never have to configure it:

  • allowed_rpc_exception_modules

And this bizaroness:

  • fake_rabbit


The notify API defines semantics for emitting and handling event notifications.

As for the RPC API, there are multiple backend drivers which implement the API semantics using different transport mechanisms. By far the most widely used backend driver notifications is the RPC driver which sends notifications over the same messaging system as RPC messages. However, it's certainly possible that we could have a future notifications driver using a transport that would not make sense to use for RPC messages.

Use Cases

It's important to contrast the use cases of notifications with RPCs. Historically, we have limited their use to cases where we want to asynchronously signal events to third parties.

Ceilometer Metering Messages

Doug's use case:

6. An application must be able to listen for messages sent by any peer, and any service (i.e., specifying a different "exchange", though we want to extend this to include a different broker as well).

could be described as:

Invoke Method on one Server in each of Multiple Pools of Servers


Insert a Tap to Listen To Method Invocations on a Pool of Servers

The use case in Ceilometer appears to be:

  • a ceilometer-collector processes a notification from glance-api and invokes the 'record_metering_data' on the 'metering' topic in the 'ceilometer' exchange and one of the ceilometer-collector services handles the method ... but another third party service also handles it?

There seems to be some overlap with notifications here - using RPC from ceilometer code (whether it be code that runs in ceilometer-collector or code which gets plugged into e.g. nova-compute) to send metering data to one of the ceilometer-collector services feels right. But sending metering data out for unknown third party listeners feels like we should be using notifications.

i.e. for sending metering data to ceilometer-collector, we have ceilometer code using an internal ceilometer RPC API. However, for making metering data available to third parties, why not publish them using notifications?

As discussed, Ceilometer will move to just sending metering data out as notifications.

Searchlight Resource Indexing

Searchlight indexes information (currently into Elasticsearch) about resources across the various Openstack services to provide a unified search and caching layer for Horizon and other consumers. It listens to notifications to determine when resources have changed. Where possible the notifications alone are used to index a resource, and we are trying to encourage services to put as much information as possible into their notifications.

Emitting Notifications

Notification messages have a well-defined format which notification consumers rely upon. An example notification message might be:

  'message_id': '5c329c20-d435-444c-8fa2-e1b54592219c',
  'publisher_id': 'compute.host1',
  'event_type': 'compute.instance.exists',
  'priority': 'INFO',
  'payload': {'user_id': ..., 'tenant_id': ..., 'instance_id', ..., ...}
  '_context_user_id': None,
  '_context_project_id': None,
  '_context_is_admin': True,

We have made a commitment to the stability of this message format, so the first goal for this API has to be the ability to send this exact format.

We should aim to encourage is project-specific APIs for sending related notifications, for example:

class ImageNotifier(notify.Notifier):

    def __init__(self, conf):
        super(ImageNotifier, self).__init__(conf, publisher_id=conf.host)

  def add(self, context, image):
      payload = self._generate_image_payload(mage)
      self.notifier.info(context, 'image.update', payload)

A base class would facilitate this:

class Notifier(object):

    def __init__(self, conf, publisher_id, driver=None,
                 transport=None, topic=None):
        self.conf = conf
        self.publisher_id = publisher_id
        self._drivers = drivers if driver is not None else conf.notification_driver
        self._topics = [topic] if topic is not None else conf.notification_topics

    def debug(self, context, event_type, payload):

    def info(self, context, event_type, payload):

    def warn(self, context, event_type, payload):

By adding a method for each priority level we make it much more explicit which levels are available. We allow the API user to specify a notification driver, messaging transport and/or topic but these would usually come from configuration.

We will continue to support existing configuration options:

  • notification_driver: a list of drivers which should be used to emit notifications. Currently the values in this list are full python module paths but we will switch to using entry point names instead, while still supporting the previous names. The drivers available now are 'rpc', 'rpc2', 'log', 'no-op' and 'test'. The two 'rpc' drivers are the most commonly used and it probably makes more sense to rename them to e.g. 'messaging'. Note, the 'rpc2' driver uses a new on-the-wire message format and the 'rpc' driver is deprecated but will continue to be supported for the foreseeable future.
  • notification_topics: a list of topic names over which notification messages will be sent. This defaults to 'notifications' and, in practice, the topic names are actually e.g. 'notifications.info', 'notifications.error' so this is more about configuring the topic prefix then the actual topic name.

Handling Notifications


The RPC notification driver currently uses a topic like 'notifications.info' which means a consumer of these notifications would need to use a wildcard based topic to pick up all notifications. We probably want to allow for RPC drivers which don't have a wildcard based topic matching mechanism, so we'll want to explicitly pass the topic and subtopic to the driver when sending.

The issue is that there is currently a 1:1 mapping of queue name to routing key. Ideally we'd like to have a single queue <notifications> and issue different routing keys to it based on the notification priority. This single queue should be able to have events for .info, .error, .warn, etc (vs separate queues for each priority). This way, we can use a single consumer binding for all priorities. The problem is compounded when we need to publish to multiple downstream consumers (let's say, ceilometer and billing). Now we've got 3 queues per downstream, or 6 queues. Wildcard topic matching is actually a good thing. --Sandy

Further Discussions


Have we covered the cells use case? It looks like we just need to be able to connect to another broker and send a description of a method which we want the neighbouring cell's nova-cells service to invoke?