kazoo.recipe.queue

Zookeeper based queue implementations.

Maintainer

None

Status

Possibly Buggy

Note

This queue was reported to cause memory leaks over long running periods. See: https://github.com/python-zk/kazoo/issues/175

New in version 0.6: The Queue class.

New in version 1.0: The LockingQueue class.

Public API

class kazoo.recipe.queue.Queue(client, path)[source]

A distributed queue with optional priority support.

This queue does not offer reliable consumption. An entry is removed from the queue prior to being processed. So if an error occurs, the consumer has to re-queue the item or it will be lost.

__init__(client, path)[source]
Parameters
  • client – A KazooClient instance.

  • path – The queue path to use in ZooKeeper.

__len__()[source]

Return queue size.

get()[source]

Get item data and remove an item from the queue.

Returns

Item data or None.

Return type

bytes

put(value, priority=100)[source]

Put an item into the queue.

Parameters
  • value – Byte string to put into the queue.

  • priority – An optional priority as an integer with at most 3 digits. Lower values signify higher priority.

class kazoo.recipe.queue.LockingQueue(client, path)[source]

A distributed queue with priority and locking support.

Upon retrieving an entry from the queue, the entry gets locked with an ephemeral node (instead of deleted). If an error occurs, this lock gets released so that others could retake the entry. This adds a little penalty as compared to Queue implementation.

The user should call the LockingQueue.get() method first to lock and retrieve the next entry. When finished processing the entry, a user should call the LockingQueue.consume() method that will remove the entry from the queue.

This queue will not track connection status with ZooKeeper. If a node locks an element, then loses connection with ZooKeeper and later reconnects, the lock will probably be removed by Zookeeper in the meantime, but a node would still think that it holds a lock. The user should check the connection status with Zookeeper or call LockingQueue.holds_lock() method that will check if a node still holds the lock.

Note

LockingQueue requires ZooKeeper 3.4 or above, since it is using transactions.

__init__(client, path)[source]
Parameters
  • client – A KazooClient instance.

  • path – The queue path to use in ZooKeeper.

__len__()[source]

Returns the current length of the queue.

Returns

queue size (includes locked entries count).

consume()[source]

Removes a currently processing entry from the queue.

Returns

True if element was removed successfully, False otherwise.

Return type

bool

get(timeout=None)[source]

Locks and gets an entry from the queue. If a previously got entry was not consumed, this method will return that entry.

Parameters

timeout – Maximum waiting time in seconds. If None then it will wait untill an entry appears in the queue.

Returns

A locked entry value or None if the timeout was reached.

Return type

bytes

holds_lock()[source]

Checks if a node still holds the lock.

Returns

True if a node still holds the lock, False otherwise.

Return type

bool

put(value, priority=100)[source]

Put an entry into the queue.

Parameters
  • value – Byte string to put into the queue.

  • priority – An optional priority as an integer with at most 3 digits. Lower values signify higher priority.

put_all(values, priority=100)[source]

Put several entries into the queue. The action only succeeds if all entries where put into the queue.

Parameters
  • values – A list of values to put into the queue.

  • priority – An optional priority as an integer with at most 3 digits. Lower values signify higher priority.

release()[source]

Removes the lock from currently processed item without consuming it.

Returns

True if the lock was removed successfully, False otherwise.

Return type

bool