使用asyncio.Queue进行生产者-消费者流 [英] Using asyncio.Queue for producer-consumer flow

查看:425
本文介绍了使用asyncio.Queue进行生产者-消费者流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对于如何将asyncio.Queue用于特定的生产者-消费者模式感到困惑,在这种模式中,生产者和消费者都可以同时并独立地进行操作.

I'm confused about how to use asyncio.Queue for a particular producer-consumer pattern in which both the producer and consumer operate concurrently and independently.

首先,请考虑以下示例,该示例紧随 docs中的示例asyncio.Queue :

First, consider this example, which closely follows that from the docs for asyncio.Queue:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:0.2f} seconds')

async def main(n):
    queue = asyncio.Queue()
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    tasks = []
    for i in range(n):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

if __name__ == '__main__':
    import sys
    n = 3 if len(sys.argv) == 1 else sys.argv[1]
    asyncio.run(main())

有关此脚本的一个更详细的信息:通过常规循环,使用queue.put_nowait(sleep_for)将项目同步放入队列.

There is one finer detail about this script: the items are put into the queue synchronously, with queue.put_nowait(sleep_for) over a conventional for-loop.

我的目标是创建一个使用async def worker()(或consumer())和async def producer()的脚本.两者都应安排为同时运行.没有一个消费者协程明确地与生产者绑定或链接.

My goal is to create a script that uses async def worker() (or consumer()) and async def producer(). Both should be scheduled to run concurrently. No one consumer coroutine is explicitly tied to or chained from a producer.

如何修改上面的程序,以使生产者可以自己与消费者/工人同时安排协程?

How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?

PYMOTW 中的第二个示例.它要求生产者提前知道消费者的数量,并使用None作为向消费者发出生产已经完成的信号.

There is a second example from PYMOTW. It requires the producer to know the number of consumers ahead of time, and uses None as a signal to the consumer that production is done.

推荐答案

我如何修改上面的程序,以便生产者可以与消费者/工人同时调度自己的协程?

How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?

可以在不更改其基本逻辑的情况下对该示例进行概括:

The example can be generalized without changing its essential logic:

  • 将插入循环移动到单独的生产者协程.
  • 在后台启动消费者,让他们处理生产的商品.
  • 等待生产者通过await完成生产,例如await producer()await gather(*producers)等.
  • 所有生产者完成后,等待剩余的生产物品用await queue.join()
  • 进行处理.
  • 取消消费者,所有消费者现在都在闲着等待下一个永远不会到达的排队的商品.
  • Move the insertion loop to a separate producer coroutine.
  • Start the consumers in the background, letting them process the produced items.
  • Wait for the producer(s) to finish by awaiting them, as with await producer() or await gather(*producers), etc.
  • Once all producers are done, wait for the remaining produced items to be processed with await queue.join()
  • Cancel the consumers, all of which are now idly waiting for the next queued item which will never arrive.

以下是实现上述内容的示例:

Here is an example implementing the above:

import asyncio, random

async def rnd_sleep(t):
    # sleep for T seconds on average
    await asyncio.sleep(t * random.random() * 2)

async def producer(queue):
    while True:
        token = random.random()
        print(f'produced {token}')
        if token < .05:
            break
        await queue.put(token)
        await rnd_sleep(.1)

async def consumer(queue):
    while True:
        token = await queue.get()
        await rnd_sleep(.3)
        queue.task_done()
        print(f'consumed {token}')

async def main():
    queue = asyncio.Queue()

    # fire up the both producers and consumers
    producers = [asyncio.create_task(producer(queue))
                 for _ in range(3)]
    consumers = [asyncio.create_task(consumer(queue))
                 for _ in range(10)]

    # with both producers and consumers running, wait for
    # the producers to finish
    await asyncio.gather(*producers)
    print('---- done producing')

    # wait for the remaining tasks to be processed
    await queue.join()

    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()

asyncio.run(main())

这篇关于使用asyncio.Queue进行生产者-消费者流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆