并行读取并顺序写入? [英] Read in parallel and write sequentially?

查看:67
本文介绍了并行读取并顺序写入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码,这些代码按顺序为每个 id 进行读写.

I have the following code which read and write for each id sequentially.

async def main():
    while id < 1000:
       data = await read_async(id) 
       await data.write_async(f'{id}.csv')
       id += 1
       

read_async()只需几分钟,而 write_async()只需不到一分钟即可运行.现在我要

read_async() takes several minutes and write_async() takes less than one minute to run. Now I want to

  1. 并行运行 read_async(id).但是,由于内存限制,最多可以并行运行3个调用.
  2. write_async 必须按顺序运行,即, write_async(n + 1)不能在 write_async(n)之前运行.li>
  1. Run read_async(id) in parallel. However, at most 3 calls can be run in parallel because of memory limitation.
  2. write_async has to be run sequentially, i.e., write_async(n+1) cannot be run before write_async(n).

推荐答案

您可以使用队列和固定数量的任务来读取和写入主任务.主要任务可以使用事件来发现读者可以使用新数据,并可以通过共享指令从读者那里获取新数据.例如(未试用):

You could use a queue and a fixed number of tasks for reading, and write from the main task. The main task can use an event to find out that new data is available from the readers and and a shared dict to get it from them. For example (untested):

async def reader(q, id_to_data, data_ready):
    while True:
        id = await q.get()
        data = await read_async(id) 
        id_to_data[id] = data
        data_ready.set()

async def main():
    q = asyncio.Queue()
    for id in range(1000):
        await q.put(id)

    id_to_data = {}
    data_ready = asyncio.Event()
    readers = [asyncio.create_task(reader(q, id_to_data, data_ready))
               for _ in 3]

    for id in range(1000):
       while True:
           # wait for the current ID to appear before writing
           if id in id_to_data:
               data = id_to_data.pop(id)
               await data.write_async(f'{id}.csv')
               break
               # move on to the next ID
           else:
               # wait for new data and try again
               await data_ready.wait()
               data_ready.clear()

    for r in readers:
        r.cancel()

因为结果是无序的,所以使用单独的结果队列代替事件将不起作用.优先级队列可以解决该问题,但仍然可以立即返回当前可用的最低ID,而编写者需要 next id才能按顺序处理所有ID.

Using a separate queue for results instead of the event wouldn't work because a queue is unordered. A priority queue would fix that, bit it would still immediately return the lowest id currently available, whereas the writer needs the next id in order to process all ids in order.

这篇关于并行读取并顺序写入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆