API Reference¶
-
aio_pika.
connect
(url: str = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: asyncio.events.AbstractEventLoop = None, ssl_options: dict = None, timeout: Union[int, float] = None, connection_class: Type[ConnectionType] = <class 'aio_pika.connection.Connection'>, client_properties: dict = None, **kwargs) → ConnectionType[source]¶ Make connection to the broker.
Example:
import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" )
Connect to localhost with default credentials:
import aio_pika async def main(): connection = await aio_pika.connect()
Note
- The available keys for ssl_options parameter are:
- cert_reqs
- certfile
- keyfile
- ssl_version
For an information on what the ssl_options can be set to reference the official Python documentation .
Set connection name for RabbitMQ admin panel:
read_connection = await connect( client_properties={ 'connection_name': 'Read connection' } ) write_connection = await connect( client_properties={ 'connection_name': 'Write connection' } )
URL string might be contain ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem
Parameters: - client_properties – add custom client capability.
- url – RFC3986 formatted broker address. When
None
will be used keyword arguments. - host – hostname of the broker
- port – broker port 5672 by default
- login – username string. ‘guest’ by default.
- password – password string. ‘guest’ by default.
- virtualhost – virtualhost parameter. ‘/’ by default
- ssl – use SSL for connection. Should be used with addition kwargs.
- ssl_options – A dict of values for the SSL connection.
- timeout – connection timeout in seconds
- loop – Event loop (
asyncio.get_event_loop()
whenNone
) - connection_class – Factory of a new connection
- kwargs – addition parameters which will be passed to the connection.
Returns: aio_pika.connection.Connection
-
aio_pika.
connect_robust
(url: str = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: asyncio.events.AbstractEventLoop = None, ssl_options: dict = None, timeout: Union[int, float] = None, connection_class: Type[ConnectionType] = <class 'aio_pika.robust_connection.RobustConnection'>, client_properties: dict = None, **kwargs) → ConnectionType[source]¶ Make robust connection to the broker.
That means that connection state will be restored after reconnect. After connection has been established the channels, the queues and the exchanges with their bindings will be restored.
Example:
import aio_pika async def main(): connection = await aio_pika.connect_robust( "amqp://guest:guest@127.0.0.1/" )
Connect to localhost with default credentials:
import aio_pika async def main(): connection = await aio_pika.connect_robust()
Note
- The available keys for ssl_options parameter are:
- cert_reqs
- certfile
- keyfile
- ssl_version
For an information on what the ssl_options can be set to reference the official Python documentation .
URL string might be contain ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem
Parameters: - url – RFC3986 formatted broker address. When
None
will be used keyword arguments. - host – hostname of the broker
- port – broker port 5672 by default
- login – username string. ‘guest’ by default.
- password – password string. ‘guest’ by default.
- virtualhost – virtualhost parameter. ‘/’ by default
- ssl – use SSL for connection. Should be used with addition kwargs.
- ssl_options – A dict of values for the SSL connection.
- timeout – connection timeout in seconds
- loop – Event loop (
asyncio.get_event_loop()
whenNone
) - connection_class – Factory of a new connection
- kwargs – addition parameters which will be passed to the connection.
Returns: aio_pika.connection.Connection
-
aio_pika.
AMQPException
¶ alias of
aiormq.exceptions.AMQPError
-
class
aio_pika.
Channel
(connection, channel_number: Optional[int] = None, publisher_confirms: bool = True, on_return_raises: bool = False)[source]¶ Channel abstraction
Parameters: - connection –
aio_pika.adapter.AsyncioConnection
instance - loop – Event loop (
asyncio.get_event_loop()
whenNone
) - future_store –
aio_pika.common.FutureStore
instance - publisher_confirms – False if you don’t need delivery confirmations (in pursuit of performance)
-
EXCHANGE_CLASS
¶ alias of
aio_pika.exchange.Exchange
-
QUEUE_CLASS
¶ alias of
aio_pika.queue.Queue
-
declare_exchange
(name: str, type: Union[aio_pika.exchange.ExchangeType, str] = <ExchangeType.DIRECT: 'direct'>, durable: bool = None, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: dict = None, timeout: Union[int, float] = None) → aio_pika.exchange.Exchange[source]¶ Declare an exchange.
Parameters: - name – string with exchange name or
aio_pika.exchange.Exchange
instance - type – Exchange type. Enum ExchangeType value or string. String values must be one of ‘fanout’, ‘direct’, ‘topic’, ‘headers’, ‘x-delayed-message’, ‘x-consistent-hash’.
- durable – Durability (exchange survive broker restart)
- auto_delete – Delete queue when channel will be closed.
- internal – Do not send it to broker just create an object
- passive – Do not fail when entity was declared
previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when exchange doesn’t exist. - arguments – additional arguments
- timeout – execution timeout
Returns: aio_pika.exchange.Exchange
instance- name – string with exchange name or
-
declare_queue
(name: str = None, *, durable: bool = None, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: dict = None, timeout: Union[int, float] = None) → aio_pika.queue.Queue[source]¶ Parameters: - name – queue name
- durable – Durability (queue survive broker restart)
- exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
- passive – Do not fail when entity was declared
previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when queue doesn’t exist. - auto_delete – Delete queue when channel will be closed.
- arguments – additional arguments
- timeout – execution timeout
Returns: aio_pika.queue.Queue
instanceRaises: aio_pika.exceptions.ChannelClosed
instance
-
get_exchange
(name: str, *, ensure: bool = True) → aio_pika.exchange.Exchange[source]¶ With
ensure=True
, it’s a shortcut for.declare_exchange(..., passive=True)
; otherwise, it returns an exchange instance without checking its existence.When the exchange does not exist, if
ensure=True
, will raiseaio_pika.exceptions.ChannelClosed
.Use this method in a separate channel (or as soon as channel created). This is only a way to get an exchange without declaring a new one.
Parameters: - name – exchange name
- ensure – ensure that the exchange exists
Returns: aio_pika.exchange.Exchange
instanceRaises: aio_pika.exceptions.ChannelClosed
instance
-
get_queue
(name: str, *, ensure: bool = True) → aio_pika.queue.Queue[source]¶ With
ensure=True
, it’s a shortcut for.declare_queue(..., passive=True)
; otherwise, it returns a queue instance without checking its existence.When the queue does not exist, if
ensure=True
, will raiseaio_pika.exceptions.ChannelClosed
.Use this method in a separate channel (or as soon as channel created). This is only a way to get a queue without declaring a new one.
Parameters: - name – queue name
- ensure – ensure that the queue exists
Returns: aio_pika.queue.Queue
instanceRaises: aio_pika.exceptions.ChannelClosed
instance
- connection –
-
class
aio_pika.
Connection
(url, loop: Optional[asyncio.events.AbstractEventLoop] = None, **kwargs)[source]¶ Connection abstraction
-
CHANNEL_CLASS
¶ alias of
aio_pika.channel.Channel
-
add_close_callback
(callback: Callable[[Any, Optional[BaseException]], None], weak: bool = False)[source]¶ Add callback which will be called after connection will be closed.
BaseException
or None will be passed as a first argument.Example:
import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) connection.add_close_callback(print) await connection.close() # None
Returns: None
-
channel
(channel_number: int = None, publisher_confirms: bool = True, on_return_raises: bool = False) → aio_pika.channel.Channel[source]¶ Coroutine which returns new instance of
Channel
.Example:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) channel1 = connection.channel() await channel1.close() # Creates channel with specific channel number channel42 = connection.channel(42) await channel42.close() # For working with transactions channel_no_confirms = connection.channel( publisher_confirms=True ) await channel_no_confirms.close()
Also available as an asynchronous context manager:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) async with connection.channel() as channel: # channel is open and available # channel is now closed
Parameters: - channel_number – specify the channel number explicit
- publisher_confirms – if True the
aio_pika.Exchange.publish()
method will be returnbool
after publish is complete. Otherwise theaio_pika.Exchange.publish()
method will be returnNone
- on_return_raises – raise an
aio_pika.exceptions.DeliveryError
when mandatory message will be returned
-
connect
(timeout: Union[int, float] = None, **kwargs)[source]¶ Connect to AMQP server. This method should be called after
aio_pika.connection.Connection.__init__()
Note
This method is called by
connect()
. You shouldn’t call it explicitly.
-
heartbeat_last
¶ returns loop.time() value since last received heartbeat
-
-
class
aio_pika.
Exchange
(connection, channel: aiormq.channel.Channel, name: str, type: Union[aio_pika.exchange.ExchangeType, str] = <ExchangeType.DIRECT: 'direct'>, *, auto_delete: Optional[bool], durable: Optional[bool], internal: Optional[bool], passive: Optional[bool], arguments: dict = None)[source]¶ Exchange abstraction
-
bind
(exchange: Union[Exchange, str], routing_key: str = '', *, arguments: dict = None, timeout: Union[int, float] = None) → pamqp.specification.Exchange.BindOk[source]¶ A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange( src_exchange_name, auto_delete=True ) dest_exchange = await channel.declare_exchange( dest_exchange_name, auto_delete=True ) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key)
Parameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Returns: None
- exchange –
-
delete
(if_unused: bool = False, timeout: Union[int, float] = None) → pamqp.specification.Exchange.DeleteOk[source]¶ Delete the queue
Parameters: - timeout – operation timeout
- if_unused – perform deletion when queue has no bindings.
-
publish
(message: aio_pika.message.Message, routing_key: str, *, mandatory: bool = True, immediate: bool = False, timeout: Union[int, float] = None) → Union[pamqp.specification.Basic.Ack, pamqp.specification.Basic.Nack, pamqp.specification.Basic.Reject, None][source]¶ Publish the message to the queue. aio-pika uses publisher confirms extension for message delivery.
-
unbind
(exchange: Union[Exchange, str], routing_key: str = '', arguments: dict = None, timeout: Union[int, float] = None) → pamqp.specification.Exchange.UnbindOk[source]¶ Remove exchange-to-exchange binding for this
Exchange
instanceParameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Returns: None
- exchange –
-
-
class
aio_pika.
IncomingMessage
(message: aiormq.types.DeliveredMessage, no_ack: bool = False)[source]¶ Incoming message is seems like Message but has additional methods for message acknowledgement.
Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit (“manual”) client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:
- basic.ack is used for positive acknowledgements
- basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1)
- basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack
Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn’t processed but still should be deleted.
Create an instance of
IncomingMessage
-
ack
(multiple: bool = False) → _asyncio.Task[source]¶ Send basic.ack is used for positive acknowledgements
Note
This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn’t require any responses from the broker.
Parameters: multiple – If set to True, the message’s delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the ack refers to a single message. Returns: None
-
process
(requeue=False, reject_on_redelivered=False, ignore_processed=False) → AbstractAsyncContextManager[T_co][source]¶ Context manager for processing the message
>>> async def on_message_received(message: IncomingMessage): ... async with message.process(): ... # When exception will be raised ... # the message will be rejected ... print(message.body)
Example with ignore_processed=True
>>> async def on_message_received(message: IncomingMessage): ... async with message.process(ignore_processed=True): ... # Now (with ignore_processed=True) you may reject ... # (or ack) message manually too ... if True: # some reasonable condition here ... await message.reject() ... print(message.body)
Parameters: - requeue – Requeue message when exception.
- reject_on_redelivered – When True message will be rejected only when message was redelivered.
- ignore_processed – Do nothing if message already processed
-
reject
(requeue: bool = False) → _asyncio.Task[source]¶ When requeue=True the message will be returned to queue. Otherwise message will be dropped.
Note
This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn’t require any responses from the broker.
Parameters: requeue – bool
-
class
aio_pika.
Message
(body: bytes, *, headers: dict = None, content_type: str = None, content_encoding: str = None, delivery_mode: aio_pika.message.DeliveryMode = None, priority: int = None, correlation_id=None, reply_to: str = None, expiration: Union[int, datetime.datetime, float, datetime.timedelta, None] = None, message_id: str = None, timestamp: Union[int, datetime.datetime, float, datetime.timedelta, None] = None, type: str = None, user_id: str = None, app_id: str = None)[source]¶ AMQP message abstraction
Creates a new instance of Message
Parameters: - body – message body
- headers – message headers
- headers_raw – message raw headers
- content_type – content type
- content_encoding – content encoding
- delivery_mode – delivery mode
- priority – priority
- correlation_id – correlation id
- reply_to – reply to
- expiration – expiration in seconds (or datetime or timedelta)
- message_id – message id
- timestamp – timestamp
- type – type
- user_id – user id
- app_id – app id
-
info
() → dict[source]¶ Create a dict with message attributes
{ "body_size": 100, "headers": {}, "content_type": "text/plain", "content_encoding": "", "delivery_mode": DeliveryMode.NOT_PERSISTENT, "priority": 0, "correlation_id": "", "reply_to": "", "expiration": "", "message_id": "", "timestamp": "", "type": "", "user_id": "", "app_id": "", }
-
locked
¶ is message locked
Returns: bool
-
properties
¶ Build
aiormq.spec.Basic.Properties
object
-
class
aio_pika.
Queue
(connection, channel: aiormq.channel.Channel, name, durable, exclusive, auto_delete, arguments, passive: bool = False)[source]¶ AMQP queue abstraction
-
bind
(exchange: Union[Exchange, str], routing_key: str = None, *, arguments=None, timeout: int = None) → pamqp.specification.Queue.BindOk[source]¶ A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
Parameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Raises: asyncio.TimeoutError – when the binding timeout period has elapsed.
Returns: None
- exchange –
-
cancel
(consumer_tag: str, timeout=None, nowait: bool = False) → pamqp.specification.Basic.CancelOk[source]¶ This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.
Parameters: - consumer_tag – consumer tag returned by
consume()
- timeout – execution timeout
- nowait (bool) – Do not expect a Basic.CancelOk response
Returns: Basic.CancelOk when operation completed successfully
- consumer_tag – consumer tag returned by
-
consume
(callback: Callable[[aio_pika.message.IncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, arguments: dict = None, consumer_tag=None, timeout=None) → str[source]¶ Start to consuming the
Queue
.Parameters: - timeout –
asyncio.TimeoutError
will be raises when the Future was not finished after this time. - callback – Consuming callback. Could be a coroutine.
- no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
- exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
- arguments – additional arguments
- consumer_tag – optional consumer tag
Raises: asyncio.TimeoutError – when the consuming timeout period has elapsed.
Return str: consumer tag
str
- timeout –
-
declare
(timeout: int = None) → pamqp.specification.Queue.DeclareOk[source]¶ Declare queue.
Parameters: - timeout – execution timeout
- passive – Only check to see if the queue exists.
Returns: None
-
delete
(*, if_unused=True, if_empty=True, timeout=None) → pamqp.specification.Queue.DeclareOk[source]¶ Delete the queue.
Parameters: - if_unused – Perform delete only when unused
- if_empty – Perform delete only when empty
- timeout – execution timeout
Returns: None
-
get
(*, no_ack=False, fail=True, timeout=5) → Optional[aio_pika.message.IncomingMessage][source]¶ Get message from the queue.
Parameters: - no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
- timeout – execution timeout
- fail – Should return
None
instead of raise an exceptionaio_pika.exceptions.QueueEmpty
.
Returns: aio_pika.message.IncomingMessage
- no_ack – if
-
iterator
(**kwargs) → aio_pika.queue.QueueIterator[source]¶ Returns an iterator for async for expression.
Full example:
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async with queue.iterator() as q: async for message in q: print(message.body)
When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the “async for” expression directly.
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async for message in queue: print(message.body)
Returns: QueueIterator
-
purge
(no_wait=False, timeout=None) → pamqp.specification.Queue.PurgeOk[source]¶ Purge all messages from the queue.
Parameters: - no_wait – no wait response
- timeout – execution timeout
Returns: None
-
unbind
(exchange: Union[Exchange, str], routing_key: str = None, arguments: dict = None, timeout: int = None) → pamqp.specification.Queue.UnbindOk[source]¶ Remove binding from exchange for this
Queue
instanceParameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Raises: asyncio.TimeoutError – when the unbinding timeout period has elapsed.
Returns: None
- exchange –
-
-
class
aio_pika.
RobustChannel
(connection, channel_number: int = None, publisher_confirms: bool = True, on_return_raises: bool = False)[source]¶ Channel abstraction
Parameters: - connection –
aio_pika.adapter.AsyncioConnection
instance - loop – Event loop (
asyncio.get_event_loop()
whenNone
) - future_store –
aio_pika.common.FutureStore
instance - publisher_confirms – False if you don’t need delivery confirmations (in pursuit of performance)
-
EXCHANGE_CLASS
¶ alias of
aio_pika.robust_exchange.RobustExchange
-
QUEUE_CLASS
¶ alias of
aio_pika.robust_queue.RobustQueue
-
declare_exchange
(name: str, type: Union[aio_pika.exchange.ExchangeType, str] = <ExchangeType.DIRECT: 'direct'>, durable: bool = None, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: dict = None, timeout: Union[int, float] = None, robust: bool = True) → aio_pika.exchange.Exchange[source]¶ Declare an exchange.
Parameters: - name – string with exchange name or
aio_pika.exchange.Exchange
instance - type – Exchange type. Enum ExchangeType value or string. String values must be one of ‘fanout’, ‘direct’, ‘topic’, ‘headers’, ‘x-delayed-message’, ‘x-consistent-hash’.
- durable – Durability (exchange survive broker restart)
- auto_delete – Delete queue when channel will be closed.
- internal – Do not send it to broker just create an object
- passive – Do not fail when entity was declared
previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when exchange doesn’t exist. - arguments – additional arguments
- timeout – execution timeout
Returns: aio_pika.exchange.Exchange
instance- name – string with exchange name or
-
declare_queue
(name: str = None, *, durable: bool = None, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: dict = None, timeout: Union[int, float] = None, robust: bool = True) → aio_pika.queue.Queue[source]¶ Parameters: - name – queue name
- durable – Durability (queue survive broker restart)
- exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
- passive – Do not fail when entity was declared
previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when queue doesn’t exist. - auto_delete – Delete queue when channel will be closed.
- arguments – additional arguments
- timeout – execution timeout
Returns: aio_pika.queue.Queue
instanceRaises: aio_pika.exceptions.ChannelClosed
instance
- connection –
-
class
aio_pika.
RobustConnection
(url, loop=None, **kwargs)[source]¶ Robust connection
-
CHANNEL_CLASS
¶ alias of
aio_pika.robust_channel.RobustChannel
-
add_reconnect_callback
(callback: Callable[[], None], weak: bool = False)[source]¶ Add callback which will be called after reconnect.
Returns: None
-
channel
(channel_number: int = None, publisher_confirms: bool = True, on_return_raises=False)[source]¶ Coroutine which returns new instance of
Channel
.Example:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) channel1 = connection.channel() await channel1.close() # Creates channel with specific channel number channel42 = connection.channel(42) await channel42.close() # For working with transactions channel_no_confirms = connection.channel( publisher_confirms=True ) await channel_no_confirms.close()
Also available as an asynchronous context manager:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) async with connection.channel() as channel: # channel is open and available # channel is now closed
Parameters: - channel_number – specify the channel number explicit
- publisher_confirms – if True the
aio_pika.Exchange.publish()
method will be returnbool
after publish is complete. Otherwise theaio_pika.Exchange.publish()
method will be returnNone
- on_return_raises – raise an
aio_pika.exceptions.DeliveryError
when mandatory message will be returned
-
connect
(timeout: Union[int, float] = None, **kwargs)[source]¶ Connect to AMQP server. This method should be called after
aio_pika.connection.Connection.__init__()
Note
This method is called by
connect()
. You shouldn’t call it explicitly.
-
is_closed
¶ Is this connection is closed
-
-
class
aio_pika.
RobustExchange
(connection, channel: aiormq.channel.Channel, name: str, type: aio_pika.exchange.ExchangeType = <ExchangeType.DIRECT: 'direct'>, *, auto_delete: Optional[bool], durable: Optional[bool], internal: Optional[bool], passive: Optional[bool], arguments: dict = None)[source]¶ Exchange abstraction
-
bind
(exchange, routing_key: str = '', *, arguments=None, timeout: int = None, robust: bool = True)[source]¶ A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange( src_exchange_name, auto_delete=True ) dest_exchange = await channel.declare_exchange( dest_exchange_name, auto_delete=True ) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key)
Parameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Returns: None
- exchange –
-
unbind
(exchange, routing_key: str = '', arguments: dict = None, timeout: int = None)[source]¶ Remove exchange-to-exchange binding for this
Exchange
instanceParameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Returns: None
- exchange –
-
-
class
aio_pika.
RobustQueue
(connection, channel: aiormq.channel.Channel, name, durable, exclusive, auto_delete, arguments, passive: bool = False)[source]¶ -
bind
(exchange: Union[Exchange, str], routing_key: str = None, *, arguments=None, timeout: int = None, robust: bool = True)[source]¶ A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
Parameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Raises: asyncio.TimeoutError – when the binding timeout period has elapsed.
Returns: None
- exchange –
-
cancel
(consumer_tag: str, timeout=None, nowait: bool = False)[source]¶ This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.
Parameters: - consumer_tag – consumer tag returned by
consume()
- timeout – execution timeout
- nowait (bool) – Do not expect a Basic.CancelOk response
Returns: Basic.CancelOk when operation completed successfully
- consumer_tag – consumer tag returned by
-
consume
(callback: function, no_ack: bool = False, exclusive: bool = False, arguments: dict = None, consumer_tag=None, timeout=None, robust: bool = True) → str[source]¶ Start to consuming the
Queue
.Parameters: - timeout –
asyncio.TimeoutError
will be raises when the Future was not finished after this time. - callback – Consuming callback. Could be a coroutine.
- no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
- exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
- arguments – additional arguments
- consumer_tag – optional consumer tag
Raises: asyncio.TimeoutError – when the consuming timeout period has elapsed.
Return str: consumer tag
str
- timeout –
-
unbind
(exchange: Union[Exchange, str], routing_key: str = None, arguments: dict = None, timeout: int = None)[source]¶ Remove binding from exchange for this
Queue
instanceParameters: - exchange –
aio_pika.exchange.Exchange
instance - routing_key – routing key
- arguments – additional arguments
- timeout – execution timeout
Raises: asyncio.TimeoutError – when the unbinding timeout period has elapsed.
Returns: None
- exchange –
-
-
aio_pika.patterns.
base
¶ alias of
aio_pika.patterns.base
-
class
aio_pika.patterns.
Master
(channel: aio_pika.channel.Channel, requeue: bool = True, reject_on_redelivered: bool = False)[source]¶ Implements Master/Worker pattern. Usage example:
worker.py
master = Master(channel) worker = await master.create_worker('test_worker', lambda x: print(x))
master.py
master = Master(channel) await master.proxy.test_worker('foo')
Creates a new
Master
instance.Parameters: channel – Initialized instance of aio_pika.Channel
-
create_task
(channel_name: str, kwargs=None, **message_kwargs)[source]¶ Creates a new task for the worker
-
create_worker
(channel_name: str, func: Callable, **kwargs) → aio_pika.patterns.master.Worker[source]¶ Creates a new
Worker
instance.
-
-
class
aio_pika.patterns.
RPC
(channel: aio_pika.channel.Channel)[source]¶ Remote Procedure Call helper.
Create an instance
rpc = await RPC.create(channel)
Registering python function
# RPC instance passes only keyword arguments def multiply(*, x, y): return x * y await rpc.register("multiply", multiply)
Call function through proxy
assert await rpc.proxy.multiply(x=2, y=3) == 6
Call function explicit
assert await rpc.call('multiply', dict(x=2, y=3)) == 6
-
call
(method_name, kwargs: Optional[Dict[Hashable, Any]] = None, *, expiration: Optional[int] = None, priority: int = 5, delivery_mode: aio_pika.message.DeliveryMode = <DeliveryMode.NOT_PERSISTENT: 1>)[source]¶ Call remote method and awaiting result.
Parameters: - method_name – Name of method
- kwargs – Methos kwargs
- expiration – If not None messages which staying in queue longer
will be returned and
asyncio.TimeoutError
will be raised. - priority – Message priority
- delivery_mode – Call message delivery mode
Raises: - asyncio.TimeoutError – when message expired
- CancelledError – when called
RPC.cancel()
- RuntimeError – internal error
-
classmethod
create
(channel: aio_pika.channel.Channel, **kwargs) → aio_pika.patterns.rpc.RPC[source]¶ Creates a new instance of
aio_pika.patterns.RPC
. You should use this method instead of__init__()
, becausecreate()
returns coroutine and makes async initializeParameters: channel – initialized instance of aio_pika.Channel
Returns: RPC
-
deserialize
(data: Any) → bytes[source]¶ Deserialize data from bytes. Uses pickle by default. You should overlap this method when you want to change serializer
Parameters: data – Data which will be deserialized Returns: Any
-
register
(method_name, func: Callable[[P], R], **kwargs)[source]¶ Method creates a queue with name which equal of method_name argument. Then subscribes this queue.
Parameters: - method_name – Method name
- func – target function. Function MUST accept only keyword arguments.
- kwargs – arguments which will be passed to queue_declare
Raises: RuntimeError – Function already registered in this
RPC
instance or method_name already used.
-
serialize
(data: Any) → bytes[source]¶ Serialize data to the bytes. Uses pickle by default. You should overlap this method when you want to change serializer
Parameters: data – Data which will be serialized Returns: bytes
-