多处理池和生成器 [英] multiprocessing Pool and generators

查看:91
本文介绍了多处理池和生成器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先看下面的代码:

pool = multiprocessing.Pool(processes=N)
batch = []
for item in generator():
    batch.append(item)
    if len(batch) == 10:
        pool.apply_async(my_fun, args=(batch,))
        batch = []
# leftovers
pool.apply_async(my_fun, args=(batch,))

本质上,我是从生成器中检索数据,收集到一个列表中,然后生成一个使用这批数据的进程.

这看起来不错,但是当使用者(即池进程)比生产者(即生成器)慢时,主进程的内存使用量会增长,直到生成器停止或...系统内存不足. /p>

如何避免这个问题?

解决方案

在这种情况下,您可能要使用大小有限的队列.

q = multiprocessing.Queue(maxSize).

与最大大小,这将为您提供必要的计数,并在线程已满时阻止正在调用q.put()的线程,因此您永远不能在其上发布超过一定数量的工作项,从而限制了存储所需的内存待处理的项目.

或者,您可以使用计数信号量(例如multiprocessing.BoundedSemaphore(maxSize)).每次从生成器获取工作项时都对其进行获取,并在处理该工作项后将其释放到工作功能(my_fun)中.这样,等待处理的工作项的最大数量将永远不会超过信号量的初始值.

First look at the following code:

pool = multiprocessing.Pool(processes=N)
batch = []
for item in generator():
    batch.append(item)
    if len(batch) == 10:
        pool.apply_async(my_fun, args=(batch,))
        batch = []
# leftovers
pool.apply_async(my_fun, args=(batch,))

Essentially I'm retrieving data from a generator, collecting in into a list and then spawning a process that consumes the batch of data.

This may look fine but when the consumers (aka the pool processes) are slower than the producer (aka the generator) memory usage of the main process grows until the generator stops or... the system runs out of memory.

How can I avoid this problem?

解决方案

You might want to use a limited-size queue in this case.

q = multiprocessing.Queue(maxSize).

When used with max. size, this will provide you with the necessary counting and block the thread that is calling q.put() when it is full, so you could never post more than a certain number of work items on it and thus limit the memory needed to store the pending items.

Alternatively, you could use a counting semaphore (e.g., multiprocessing.BoundedSemaphore(maxSize)). Acquire it each time you get a work item from the generator and release it in your work function (my_fun) once the item is processed. This way, the maximum number of work items waiting to be processed will never exceed the initial value of the semaphore.

这篇关于多处理池和生成器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆