Quick start¶
Some useful examples.
Simple consumer¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
async for message in queue:
with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Simple publisher¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(
body='Hello {}'.format(routing_key).encode()
),
routing_key=routing_key
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Working with RabbitMQ transactions¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop)
async with connection:
routing_key = "test_queue"
# Transactions conflicts with `publisher_confirms`
channel = await connection.channel(publisher_confirms=False)
# Use transactions with async context manager
async with channel.transaction():
# Publishing messages but delivery will not be done
# before committing this transaction
for i in range(10):
message = aio_pika.Message(
body='Hello #{}'.format(i).encode()
)
await channel.default_exchange.publish(
message, routing_key=routing_key
)
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body='Hello {}'.format(routing_key).encode()),
routing_key=routing_key
)
await tx.commit()
tx.close()
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body='Should be rejected'.encode()),
routing_key=routing_key
)
await tx.rollback()
tx.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Get single message example¶
import asyncio
from aio_pika import connect_robust, Message
async def main(loop):
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange(
'direct', auto_delete=True
)
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes('Hello', 'utf-8'),
content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)
# Receiving message
incoming_message = await queue.get(timeout=5)
# Confirm message
incoming_message.ack()
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Tornado example¶
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import connect_robust, Message
tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
QUEUE = asyncio.Queue()
class SubscriberHandler(tornado.web.RequestHandler):
async def get(self):
message = await QUEUE.get()
self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self):
connection = self.application.settings['amqp_connection']
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body),
routing_key="test",
)
finally:
await channel.close()
self.finish("OK")
async def make_app():
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue('test', auto_delete=True)
await queue.consume(QUEUE.put, no_ack=True)
return tornado.web.Application(
[
(r"/publish", PublisherHandler),
(r"/subscribe", SubscriberHandler),
],
amqp_connection=amqp_connection
)
if __name__ == "__main__":
app = io_loop.asyncio_loop.run_until_complete(make_app())
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
External credentials example¶
import asyncio
import aio_pika
import ssl
async def main(loop):
connection = await aio_pika.connect_robust(host='127.0.0.1',
login='',
ssl=True,
ssl_options=dict(
ca_certs="cacert.pem",
certfile="cert.pem",
keyfile="key.pem",
cert_reqs=ssl.CERT_REQUIRED,
), loop=loop
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(
body='Hello {}'.format(routing_key).encode()
),
routing_key=routing_key
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()