Quick start¶
Some useful examples.
Simple consumer¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Simple publisher¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Asynchronous message processing¶
import asyncio
import aio_pika
async def process_message(message: aio_pika.IncomingMessage):
async with message.process():
print(message.body)
await asyncio.sleep(1)
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
# Creating channel
channel = await connection.channel()
# Maximum message count which will be
# processing at the same time.
await channel.set_qos(prefetch_count=100)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
await queue.consume(process_message)
return connection
if __name__ == "__main__":
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
loop.run_forever()
finally:
loop.run_until_complete(connection.close())
Working with RabbitMQ transactions¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
async with connection:
routing_key = "test_queue"
# Transactions conflicts with `publisher_confirms`
channel = await connection.channel(publisher_confirms=False)
# Use transactions with async context manager
async with channel.transaction():
# Publishing messages but delivery will not be done
# before committing this transaction
for i in range(10):
message = aio_pika.Message(body="Hello #{}".format(i).encode())
await channel.default_exchange.publish(
message, routing_key=routing_key
)
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
await tx.commit()
tx.close()
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Should be rejected".encode()),
routing_key=routing_key,
)
await tx.rollback()
tx.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Get single message example¶
import asyncio
from aio_pika import connect_robust, Message
async def main(loop):
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange("direct", auto_delete=True)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes("Hello", "utf-8"),
content_type="text/plain",
headers={"foo": "bar"},
),
routing_key,
)
# Receiving message
incoming_message = await queue.get(timeout=5)
# Confirm message
await incoming_message.ack()
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Tornado example¶
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import connect_robust, Message
tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
QUEUE = asyncio.Queue()
class SubscriberHandler(tornado.web.RequestHandler):
async def get(self):
message = await QUEUE.get()
self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self):
connection = self.application.settings["amqp_connection"]
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body), routing_key="test",
)
finally:
await channel.close()
self.finish("OK")
async def make_app():
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue("test", auto_delete=True)
await queue.consume(QUEUE.put, no_ack=True)
return tornado.web.Application(
[(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
amqp_connection=amqp_connection,
)
if __name__ == "__main__":
app = io_loop.asyncio_loop.run_until_complete(make_app())
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
External credentials example¶
import asyncio
import aio_pika
import ssl
async def main(loop):
connection = await aio_pika.connect_robust(
host="127.0.0.1",
login="",
ssl=True,
ssl_options=dict(
ca_certs="cacert.pem",
certfile="cert.pem",
keyfile="key.pem",
cert_reqs=ssl.CERT_REQUIRED,
),
loop=loop,
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Connection pooling¶
import asyncio
import aio_pika
from aio_pika.pool import Pool
async def main():
loop = asyncio.get_event_loop()
async def get_connection():
return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
connection_pool = Pool(get_connection, max_size=2, loop=loop)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
channel_pool = Pool(get_channel, max_size=10, loop=loop)
queue_name = "pool_queue"
async def consume():
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)
queue = await channel.declare_queue(
queue_name, durable=False, auto_delete=False
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
print(message)
await message.ack()
async def publish():
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(("Channel: %r" % channel).encode()),
queue_name,
)
async with connection_pool, channel_pool:
task = loop.create_task(consume())
await asyncio.wait([publish() for _ in range(10000)])
await task
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()