有效地使用多个 Asyncio 队列 [英] Using Multiple Asyncio Queues Effectively

查看:38
本文介绍了有效地使用多个 Asyncio 队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在构建一个项目,该项目需要向各个端点发出多个请求.我将这些请求包装在 Aiohttp 中以允许异步.

I am currently building a project that requires multiple requests made to various endpoints. I am wrapping these requests in Aiohttp to allow for async.

问题:
我有三个队列:queue1queue2queue3.此外,我有三个工作函数(worker1worker2worker3)与它们各自的队列相关联.第一个队列立即填充有运行前已知的列表 ID.当请求完成并将数据提交到数据库时,它会将 ID 传递给 queue2.worker2 将获取此 ID 并请求更多数据.它将根据这些数据开始生成 ID 列表(不同于 queue1/queue2 中的 ID.worker2 会将 ID 放入 queue3. 最后,worker3 会从 queue3 获取这个 ID,并在提交到数据库之前请求更多数据.

The problem:
I have three Queues: queue1, queue2, and queue3. Additionally, I have three worker functions (worker1, worker2, worker3) that are associated with their respective Queue. The first queue is populated immediately with a list IDs that is known prior to running. When the request is finished and the data is committed to a database, it passes the ID to queue2. A worker2 will take this ID and request more data. From this data it will begin to generate a list of IDs (different from the IDs in queue1/queue2. worker2 will put the IDs in queue3. Finally worker3 will grab this ID from queue3 and request more data before committing to a database.

问题出在 queue.join() 是一个阻塞调用.每个工人都绑定到一个单独的队列,因此 queue1 的加入将阻塞,直到它完成.这很好,但它也违背了使用 async 的目的.如果不使用 join(),程序将无法检测队列何时完全为空.另一个问题是,当其中一个队列为空但仍有尚未添加的数据时,可能会出现无提示错误.

The issue arises with the fact queue.join() is a blocking call. Each worker is tied to a separate Queue so the join for queue1 will block until its finished. This is fine, but it also defeats the purpose of using async. Without using join() the program is unable to detect when the Queues are totally empty. The other issue is that there may be silent errors when one of the Queues is empty but there is still data that hasn't been added yet.

基本代码大纲如下:

queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()

async with aiohttp.ClientSession() as session:
    for i in range(3):
        tasks.append(asyncio.create_task(worker1(queue1)))

    for i in range(3):
        tasks.append(asyncio.create_task(worker2(queue2)))

    for i in range(10):
        tasks.append(asyncio.create_task(worker3(queue3)))

    for i in IDs:
       queue1.put_nowait(i)

    await asyncio.gather(*tasks)

工作函数处于无限循环中,等待项目进入队列.

The worker functions sit in an infinite loop waiting for items to enter the queue.

当数据全部处理完毕后,将不会退出,程序将挂起.

When the data has all been processed there will be no exit and the program will hang.

有没有办法有效管理工人并妥善结束?

Is there a way to effectively manage the workers and end properly?

推荐答案

正如 这个答案 中很好地解释的那样,Queue.join 用于在所有注入队列的工作完成时通知生产者.由于您的第一个队列不知道特定项目何时完成(它会成倍增加并分发到其他队列),join 不是适合您的工具.

As nicely explained in this answer, Queue.join serves to inform the producer when all the work injected into the queue got completed. Since your first queue doesn't know when a particular item is done (it's multiplied and distributed to other queues), join is not the right tool for you.

从您的代码来看,您的工作人员似乎只需要运行处理队列的初始项目所需的时间.如果是这种情况,那么您可以使用关闭哨兵向工作人员发出退出信号.例如:

Judging from your code, it seems that your workers need to run for only as long as it takes to process the queue's initial items. If that is the case, then you can use a shutdown sentinel to signal the workers to exit. For example:

async with aiohttp.ClientSession() as session:

    # ... create tasks as above ...

    for i in IDs:
       queue1.put_nowait(i)
    queue1.put_nowait(None)  # no more work

    await asyncio.gather(*tasks)

这就像你的原始代码,但有一个明确的关闭请求.工作人员必须检测哨兵并做出相应的反应:将其传播到下一个队列/工作人员并退出.例如,在 worker1 中:

This is like your original code, but with an explicit shutdown request. Workers must detect the sentinel and react accordingly: propagate it to the next queue/worker and exit. For example, in worker1:

while True:
    item = queue1.get()
    if item is None:
        # done with processing, propagate sentinel to worker2 and exit
        await queue2.put(None)
        break
    # ... process item as usual ...

在其他两个 worker 中做同样的事情(除了 worker3 因为没有下一个队列而不会传播)将导致所有三个任务在工作完成后完成.由于队列是先进先出的,工作人员在遇到哨兵后可以安全地退出,知道没有任何项目被丢弃.显式关闭还可以将关闭队列与碰巧为空的队列区分开来,从而防止工作人员因队列暂时为空而过早退出.

Doing the same in other two workers (except for worker3 which won't propagate because there's no next queue) will result in all three tasks completing once the work is done. Since queues are FIFO, the workers can safely exit after encountering the sentinel, knowing that no items have been dropped. The explicit shutdown also distinguishes a shut-down queue from one that happens to be empty, thus preventing workers from exiting prematurely due to a temporarily empty queue.

该技术实际上在 Queue 的文档中演示,但是这个例子有点令人困惑, 显示了 Queue.join 的使用和关闭哨兵的使用.两者是分开的,可以相互独立使用.(并且将它们一起使用也可能是有意义的,例如使用 Queue.join 来等待里程碑",然后将其他东西放入队列中,同时保留用于停止工人的哨兵.)

This technique is actually demonstrated in the documentation of Queue, but that example somewhat confusingly shows both the use of Queue.join and the use of a shutdown sentinel. The two are separate and can be used independently of one another. (And it might also make sense to use them together, e.g. to use Queue.join to wait for a "milestone", and then put other stuff in the queue, while reserving the sentinel for stopping the workers.)

这篇关于有效地使用多个 Asyncio 队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆