Python Client Library (Marconi)
This document exists to establish the feel of working with the python Marconi client. It serves as both a vision and a direction for implementors. Check out the blueprints for more details.
If you have questions, reach us at freenode.irc.net #openstack-marconi!
Contents
Features
- Certificate verification
- Reauthentication on token expiration
- Async operations
- Full coverage of Marconi API
Classes
Controllers
- Client: handles account-wide operations - queue retrieval, etc.
- Queue: gives access to some metadata ops, as well as message handling
- Message: gives access to properties of message, as well as deletion
- Claim: Collection of claimed messages - handle querying, updating, and deleting
Core
- Connection: handles authentication with Keystone, acquires Marconi endpoint, handles requests and networking logic
Utility
- ErrorBase: the foundation for Marconi client specific errors
API Synopsis
Usage
Working With a Connection
This section has been mostly simplified by the inclusion of the common lib BaseClient. What Openstack calls a "Client" we'll call a Connection. The reason for this is that the connection handles networking-level details: puts, patches, deletes, posts, etc. What we call a client is interested in higher level details, like queues and claims.
The interface is as follows, based on code reading:
>>> # given a class marconiclient.connection implemented using marconiclient.common.apiclient.client >>> from marconiclient import connection >>> conn = connection.Connection(auth_plugin=..., username='tacocat', password='queue_master') >>> dir(conn) ['client_request', 'head', 'get', 'post', 'put', 'delete', 'patch', 'get_class']
The hope is to be able to update the Http client to use enum34, detailed below. It's good enough for now, though.
See enum34 and PEP 435 for more information on Python enumerations.
Client Operation
>>> client = Connection(async=False) >>> client.queues(marker=..., limit=10, detailed=False) <generator object <genexpr> ar 0x7fd3ef1ed730> >>> client.create_queue(name='wot') <MarconiQueue [wot]> >>> client.home <HomeDoc ...> # affects all operations for objects acquired from the client >>> client.async False
Setting Up a Pub/Sub Connection
This borrows some ideas from redis-py:
>>> sub = client.subscriber() >>> sub.<TAB> sub.channels sub.listen sub.subscribe sub.unsubscribe >>> sub.channels set([]) >>> sub.subscribe('darn_good_queue') >>> sub.channels set(['darn_good_queue']) >>> for msg in sub.listen(): # blocks until a message arrives in any of the subscribed queues # polling implementation by default <Ctrl-C> >>>
Queue Handling
>>> queue = next(q for q in client.queues if q.name == 'tacocat') >>> queue.name u'tacocat' >>> queue.href u'/v1/queues/tacocat' >>> queue.stats # a dictionary derived from a JSON response >>> queue.messages(include_claimed=False) <generator object <genexpr> ar 0x7fd3ef1ed742> >>> queue.messages(ids=[50b68a50d6f5b8c8a7c62b01, 50b68a50d6f5b8c8a7c62b02], claim=a28ee94e-6cb4-11e2-b4d5-7703267a7926, limit=1) <generator object <genexpr> ar 0x7fd3ef1ed742> >>> queue.post_messages(messages=...) >>> queue.metadata <Metadata ...> # fetches metadata from API, returns a Metadata controller >>> queue.claim(limit=10) <Claim size:8 ...> # size: actual number of messages received >>> queue.delete()
Queue Metadata Handling
>>> meta = queue.metadata >>> meta.update({'max_size': 1000}) # communicate with API, replaces >>> meta.reload() # gets most recent attributes from API
Message Handling
>>> message = next(queue.messages(...)) >>> message.age 90 >>> message.ttl 120 >>> message.href u'/v1/queues/darn_good_queue/messages/91wqe9bqwsbq98' >>> message.body {u'action': u'win'} >>> message.reload() >>> message.delete() >>> message.status <EnumValue: Message.Free [value=1]>
Claim Management
>>> claim = queue.claim(limit=10) >>> claim.messages(...) <generator object <genexpr> ar 0x7fd3ef1ed742> >>> msg = next(claim.messages(...)) >>> msg.status <EnumValue: Message.Claimed [value=2]> >>> claim.href u'/v1/queues/tacocat/claims/a28ee94e-6cb4-11e2-b4d5-7703267a7926' >>> claim.ttl 90 >>> claim.grace 30 >>> claim.patch(ttl=..., grace=...) >>> claim.delete()
Error Management
The wiki gives a thorough explanation of Marconi Errors. Error handling at the client-level is a matter of transforming responses returned by Marconi into exceptions that are meaningful to users.
Here's a quick mock up of error usage at the level of the client:
>> error = marconi.error.ErrorBase() >>> error.title u'...' >>> error.description u'...' >>> error.code 1092 >>> raise error ErrorBase (error.code): error.title error.description
Workflow
This section is more top-down than the rest. Consider this what it feels like to spin up python-marconiclient in an ipython environment:
>>> from marconiclient.connection import Connection >>> from marconiclient.client import Client >>> client = Connection(auth_url='https://keystone.example.com/', username='me', password='win') >>> client.create_queue('wot') >>> queue = next(client.queues()) >>> queue.post_messages(messages=[{'event': {'data': 'winning', 'score': 10}}) >>> message = next(queue.messages()) >>> message.body {'event': {'data': 'winning', 'score': 10}} >>> message.status <Free...> >>> queue.stats {...} >>> claim = queue.claim(1) >>> message = next(claim.messages()) >>> message.status <Claimed...> >>> message <Message ttl:120> >>> message.delete() >>> claim <Claim size:1> >>> claim.delete() >>> queue <Queue [wot]> >>> queue.delete() >>> client.async False