Asyncio和Rabbitmq(Asynqp):如何从多个队列中同时使用 [英] Asyncio and rabbitmq (asynqp): how to consume from multiple queues concurrently
问题描述
我正在尝试使用python,asyncio和 asynqp 同时使用多个队列
I'm trying to consume multiple queues concurrently using python, asyncio and asynqp.
我不明白为什么我的 asyncio.sleep()
函数调用没有任何作用。代码不会在那里暂停。公平地说,我实际上不了解在哪个上下文中执行回调,以及我是否完全可以将控制权转交给事件循环(因此 asyncio.sleep()
调用会很有意义。)
I don't understand why my asyncio.sleep()
function call does not have any effect. The code doesn't pause there. To be fair, I actually don't understand in which context the callback is executed, and whether I can yield control bavck to the event loop at all (so that the asyncio.sleep()
call would make sense).
如果我不得不使用 aiohttp.ClientSession.get()
我的 process_msg
回调函数中的函数调用?我无法执行此操作,因为它不是协程。
What If I had to use a aiohttp.ClientSession.get()
function call in my process_msg
callback function? I'm not able to do it since it's not a coroutine. There has to be a way which is beyond my current understanding of asyncio.
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()
推荐答案
我不明白为什么我的asyncio.sleep()函数调用没有
无效。
I don't understand why my asyncio.sleep() function call does not have any effect.
因为 asyncio.sleep()
返回a必须与事件循环结合使用的未来对象(或 async / awa
Because asyncio.sleep()
returns a future object that has to be used in combination with an event loop (or async/await
semantics).
您不能在简单的<$ c中使用 await
$ c> def 声明,因为回调是在 async / await
上下文之外调用的,该上下文附加到幕后的某些事件循环中。换句话说,将回调样式与 async / await
样式混合起来非常棘手。
You can't use await
in simple def
declaration because the callback is called outside of async/await
context which is attached to some event loop under the hood. In other words mixing callback style with async/await
style is quite tricky.
简单的解决方案是计划将工作返回事件循环:
The simple solution though is to schedule the work back to the event loop:
async def process_msg(msg):
await asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
def _process_msg(msg):
loop = asyncio.get_event_loop()
loop.create_task(process_msg(msg))
# or if loop is always the same one single line is enough
# asyncio.ensure_future(process_msg(msg))
# some code
await queue.consume(_process_msg)
请注意,在 _process_msg
函数中没有递归,即 process_msg
的主体在<$ c中不执行$ c> _process_msg 。控件返回事件循环后,将调用内部的 process_msg
函数。
Note that there is no recursion in _process_msg
function, i.e. the body of process_msg
is not executed while in _process_msg
. The inner process_msg
function will be called once the control goes back to the event loop.
可以将其概括为以下代码:
This can be generalized with the following code:
def async_to_callback(coro):
def callback(*args, **kwargs):
asyncio.ensure_future(coro(*args, **kwargs))
return callback
async def process_msg(msg):
# the body
# some code
await queue.consume(async_to_callback(process_msg))
这篇关于Asyncio和Rabbitmq(Asynqp):如何从多个队列中同时使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!