Asyncio和Rabbitmq(Asynqp):如何从多个队列中同时使用 [英] Asyncio and rabbitmq (asynqp): how to consume from multiple queues concurrently

查看:246
本文介绍了Asyncio和Rabbitmq(Asynqp):如何从多个队列中同时使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用python,asyncio和 asynqp 同时使用多个队列

I'm trying to consume multiple queues concurrently using python, asyncio and asynqp.

我不明白为什么我的 asyncio.sleep()函数调用没有任何作用。代码不会在那里暂停。公平地说,我实际上不了解在哪个上下文中执行回调,以及我是否完全可以将控制权转交给事件循环(因此 asyncio.sleep()调用会很有意义。)

I don't understand why my asyncio.sleep() function call does not have any effect. The code doesn't pause there. To be fair, I actually don't understand in which context the callback is executed, and whether I can yield control bavck to the event loop at all (so that the asyncio.sleep() call would make sense).

如果我不得不使用 aiohttp.ClientSession.get()我的 process_msg 回调函数中的函数调用?我无法执行此操作,因为它不是协程。

What If I had to use a aiohttp.ClientSession.get() function call in my process_msg callback function? I'm not able to do it since it's not a coroutine. There has to be a way which is beyond my current understanding of asyncio.

#!/usr/bin/env python3

import asyncio
import asynqp


USERS = {'betty', 'bob', 'luis', 'tony'}


def process_msg(msg):
    asyncio.sleep(10)
    print('>> {}'.format(msg.body))
    msg.ack()

async def connect():
    connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
    channel = await connection.open_channel()
    exchange = await channel.declare_exchange('inboxes', 'direct')

    # we have 10 users. Set up a queue for each of them
    # use different channels to avoid any interference
    # during message consumption, just in case.
    for username in USERS:
        user_channel = await connection.open_channel()
        queue = await user_channel.declare_queue('Inbox_{}'.format(username))
        await queue.bind(exchange, routing_key=username)
        await queue.consume(process_msg)

    # deliver 10 messages to each user
    for username in USERS:
        for msg_idx in range(10):
            msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
            exchange.publish(msg, routing_key=username)


loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()


推荐答案


我不明白为什么我的asyncio.sleep()函数调用没有
无效。

I don't understand why my asyncio.sleep() function call does not have any effect.

因为 asyncio.sleep()返回a必须与事件循环结合使用的未来对象(或 async / awa

Because asyncio.sleep() returns a future object that has to be used in combination with an event loop (or async/await semantics).

您不能在简单的<$ c中使用 await $ c> def 声明,因为回调是在 async / await 上下文之外调用的,该上下文附加到幕后的某些事件循环中。换句话说,将回调样式与 async / await 样式混合起来非常棘手。

You can't use await in simple def declaration because the callback is called outside of async/await context which is attached to some event loop under the hood. In other words mixing callback style with async/await style is quite tricky.

简单的解决方案是计划将工作返回事件循环:

The simple solution though is to schedule the work back to the event loop:

async def process_msg(msg):
    await asyncio.sleep(10)
    print('>> {}'.format(msg.body))
    msg.ack()

def _process_msg(msg):
    loop = asyncio.get_event_loop()
    loop.create_task(process_msg(msg))
    # or if loop is always the same one single line is enough
    # asyncio.ensure_future(process_msg(msg))

# some code
await queue.consume(_process_msg)

请注意,在 _process_msg 函数中没有递归,即 process_msg 的主体在<$ c中不执行$ c> _process_msg 。控件返回事件循环后,将调用内部的 process_msg 函数。

Note that there is no recursion in _process_msg function, i.e. the body of process_msg is not executed while in _process_msg. The inner process_msg function will be called once the control goes back to the event loop.

可以将其概括为以下代码:

This can be generalized with the following code:

def async_to_callback(coro):
    def callback(*args, **kwargs):
        asyncio.ensure_future(coro(*args, **kwargs))
    return callback

async def process_msg(msg):
    # the body

# some code
await queue.consume(async_to_callback(process_msg))

这篇关于Asyncio和Rabbitmq(Asynqp):如何从多个队列中同时使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆