Jump to: navigation, search

Python Client Library (Marconi)

This document exists to establish the feel of working with the python Zaqar client. It serves as both a vision and a direction for implementors. Check out the blueprints for more details.

If you have questions, reach us at freenode.irc.net #openstack-zaqar!


The client library is available on PyPI: https://pypi.python.org/pypi/python-marconiclient/

  • Certificate verification
  • Reauthentication on token expiration
  • Async operations
  • Full coverage of Zaqar API




  • Client: handles account-wide operations - queue retrieval, etc.
  • Queue: gives access to some metadata ops, as well as message handling
  • Message: gives access to properties of message, as well as deletion
  • Claim: Collection of claimed messages - handle querying, updating, and deleting


  • Connection: handles authentication with Keystone, acquires Zaqar endpoint, handles requests and networking logic


  • ErrorBase: the foundation for Zaqar client specific errors

API Synopsis

See Zaqar/specs/api/v1

Client API Synopsis

# Given the following vars defined with the following types:
# - Client:client
# - Subscriber:sub
# - Queue:queue
# - Message:message
# - Claim:claim
>>> dir(client)
['async', 'create_queue', 'home', 'queues']
>>> dir(sub)
['channels', 'listen', 'subscribe', 'unsubscribe']
>>> dir(queue)
['claim', 'delete', 'href', 'messages', 'metadata', 'name', 'post', 'stats']
>>> dir(metadata)
['content', 'reload', 'update']
>>> dir(claim)
['delete', 'grace', 'href', 'messages', 'patch', 'release', 'ttl']
>>> dir(message)
['age', 'body', 'delete', 'href', 'reload', 'status', 'ttl']


Working With a Connection

This section has been mostly simplified by the inclusion of the common lib BaseClient. What Openstack calls a "Client" we'll call a Connection. The reason for this is that the connection handles networking-level details: puts, patches, deletes, posts, etc. What we call a client is interested in higher level details, like queues and claims.

The interface is as follows, based on code reading:

    >>> # given a class zaqarclient.connection implemented using zaqarclient.common.apiclient.client
    >>> from zaqarclient import connection
    >>> conn = connection.Connection(auth_plugin=...,
                                     username='tacocat', password='queue_master')
    >>> dir(conn)
    ['client_request', 'head', 'get', 'post', 'put', 'delete', 'patch', 'get_class']

The hope is to be able to update the Http client to use enum34, detailed below. It's good enough for now, though.

See enum34 and PEP 435 for more information on Python enumerations.

Client Operation

    >>> client = Connection(async=False)
    >>> client.queues(marker=..., limit=10, detailed=False)
    <generator object <genexpr> ar 0x7fd3ef1ed730>
    >>> client.create_queue(name='wot')
    <ZaqarQueue [wot]>
    >>> client.home
    <HomeDoc ...>
    # affects all operations for objects acquired from the client
    >>> client.async

Setting Up a Pub/Sub Connection

This borrows some ideas from redis-py:

    >>> sub = client.subscriber()
    >>> sub.<TAB>
    sub.channels  sub.listen    sub.subscribe    sub.unsubscribe
    >>> sub.channels
    >>> sub.subscribe('darn_good_queue')
    >>> sub.channels
    >>> for msg in sub.listen():
            # blocks until a message arrives in any of the subscribed queues
            # polling implementation by default

Queue Handling

    >>> queue = next(q for q in client.queues if q.name == 'tacocat')
    >>> queue.name
    >>> queue.href
    >>> queue.stats
    # a dictionary derived from a JSON response
    >>> queue.messages(include_claimed=False)
    <generator object <genexpr> ar 0x7fd3ef1ed742>
    >>> queue.messages(ids=[50b68a50d6f5b8c8a7c62b01, 50b68a50d6f5b8c8a7c62b02],
                               claim=a28ee94e-6cb4-11e2-b4d5-7703267a7926, limit=1)
    <generator object <genexpr> ar 0x7fd3ef1ed742>
    >>> queue.post_messages(messages=...)
    >>> queue.metadata
    <Metadata ...>  # fetches metadata from API, returns a Metadata controller
    >>> queue.claim(limit=10)
    <Claim size:8 ...>  # size: actual number of messages received
    >>> queue.delete()

Queue Metadata Handling

    >>> meta = queue.metadata
    >>> meta.update({'max_size': 1000})  # communicate with API, replaces
    >>> meta.reload()  # gets most recent attributes from API

Message Handling

    >>> message = next(queue.messages(...))
    >>> message.age
    >>> message.ttl
    >>> message.href
    >>> message.body
    {u'action': u'win'}
    >>> message.reload()
    >>> message.delete()
    >>> message.status
    <EnumValue: Message.Free [value=1]>

Claim Management

>>> claim = queue.claim(limit=10)
>>> claim.messages(...)
<generator object <genexpr> ar 0x7fd3ef1ed742>
>>> msg = next(claim.messages(...))
>>> msg.status
<EnumValue: Message.Claimed [value=2]>
>>> claim.href
>>> claim.ttl
>>> claim.grace
>>> claim.patch(ttl=..., grace=...)
>>> claim.delete()

Error Management

The wiki gives a thorough explanation of Zaqar Errors. Error handling at the client-level is a matter of transforming responses returned by Zaqar into exceptions that are meaningful to users.

Here's a quick mock up of error usage at the level of the client:

    >> error = zaqar.error.ErrorBase()
    >>> error.title
    >>> error.description
    >>> error.code
    >>> raise error
    ErrorBase (error.code): error.title


This section is more top-down than the rest. Consider this what it feels like to spin up python-zaqarclient in an ipython environment:

    >>> from zaqarclient.connection import Connection
    >>> from zaqarclient.client import Client
    >>> client = Connection(auth_url='https://keystone.example.com/', username='me', password='win')
    >>> client.create_queue('wot')
    >>> queue = next(client.queues())
    >>> queue.post_messages(messages=[{'event': {'data': 'winning', 'score': 10}})
    >>> message = next(queue.messages())
    >>> message.body
    {'event': {'data': 'winning', 'score': 10}}
    >>> message.status
    >>> queue.stats
    >>> claim = queue.claim(1)
    >>> message = next(claim.messages())
    >>> message.status
    >>> message
    <Message ttl:120>
    >>> message.delete()
    >>> claim
    <Claim size:1>
    >>> claim.delete()
    >>> queue
    <Queue [wot]>
    >>> queue.delete()
    >>> client.async