Patterns and helpers¶
Note
Available since aio-pika>=1.7.0
aio_pika includes some useful patterns for creating distributed systems.
Master/Worker¶
Helper which implements Master/Worker pattern. This applicable for balancing tasks between multiple workers.
The master creates tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master
async def main():
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
# Creating channel
channel = await connection.channel()
master = Master(channel)
# Creates tasks by proxy object
for task_id in range(1000):
await master.proxy.my_task_name(task_id=task_id)
# Or using create_task method
for task_id in range(1000):
await master.create_task(
'my_task_name', kwargs=dict(task_id=task_id)
)
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Worker code:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master
async def worker(*, task_id):
print(task_id)
async def main():
connection = await connect_robust("amqp://guest:guest@127.0.0.1/")
# Creating channel
channel = await connection.channel()
master = Master(channel)
await master.create_worker('my_task_name', worker, auto_delete=True)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
The one or multiple workers executes tasks.
RPC¶
Helper which implements Remote Procedure Call pattern. This applicable for balancing tasks between multiple workers.
The caller creates tasks and awaiting results:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def main():
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
# Creates tasks by proxy object
for i in range(1000):
print(await rpc.proxy.multiply(x=100, y=i))
# Or using create_task method
for i in range(1000):
print(
await rpc.call(
'multiply', kwargs=dict(x=100, y=i)
)
)
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
One or multimple callees executing tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def multiply(*, x, y):
return x * y
async def main():
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
await rpc.register('multiply', multiply, auto_delete=True)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()