python pool apply_async和map_async不会在完整队列中阻塞 [英] python pool apply_async and map_async do not block on full queue

查看:357
本文介绍了python pool apply_async和map_async不会在完整队列中阻塞的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对python相当新鲜。
我正在使用多处理模块来读取stdin中的文本行,以某种方式转换它们并将其写入数据库。以下是我的代码片段:

  batch = [] 
pool = multiprocessing.Pool(20)
i = 0
为i,枚举中的内容(sys.stdin):
batch.append(content)
如果len(批处理)> = 10000:
pool.apply_async (insert,args =(batch,i + 1))
batch = []
pool.apply_async(insert,args =(batch,i))
pool.close()
pool.join()

现在,一切正常,直到我处理巨大的输入文件(数亿行),我管道进入我的python程序。在某些时候,当我的数据库变慢时,我看到内存已经满了。



在一些播放之后,结果是pool.apply_async以及pool.map_async从来没有阻止,以便要处理的呼叫的队列越来越大。



我的问题是什么正确的方法?我会期望一个可以设置的参数,一旦达到某个队列长度,就会阻塞pool.apply_async调用。 AFAIR在Java中可以给ThreadPoolExecutor一个具有固定长度的BlockingQueue用于此目的。



谢谢!

解决方案

为了防止这样一来,这是我如何解决问题:我停止使用multiprocessing.Pool。这是我现在如何做:

  #set插入数据库数据的并发进程数量
进程=多处理。 cpu_count()* 2

#setup批处理队列
queue = multiprocessing.Queue(processes * 2)

#start处理
(进程):multiprocessing.Process(target = insert,args =(queue,))。start()

#fill queue with batchs
batch = []
for i ,枚举中的内容(sys.stdin):
batch.append(content)
如果len(batch)> = 10000:
queue.put((batch,i + 1))
batch = []
如果批次:
queue.put((batch,i + 1))

#stop进程使用poison-pill
for _ in range(processes):queue.put((None,None))

打印全部完成。在插入方法中,

每个批处理的处理都包含在从队列中拉出的循环中收到药丸:

  while True:
批次,结束= queue.get()
如果没有批次而不是结束:返回#poison药丸!完成!
[处理批次]
打印'工作完成'


I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.

After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.

What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.

Thanks!

解决方案

Just in case some one ends up here, this is how I solved the problem: I stopped using multiprocessing.Pool. Here is how I do it now:

#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2

#setup batch queue
queue = multiprocessing.Queue(processes * 2)

#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches    
batch=[]
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        queue.put((batch,i+1))
        batch = []
if batch:
    queue.put((batch,i+1))

#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))

print "all done."

in the insert method the processing of each batch is wrapped in a loop that pulls from the queue until it receives the poison pill:

while True:
    batch, end = queue.get()
    if not batch and not end: return #poison pill! complete!
    [process the batch]
print 'worker done.'

这篇关于python pool apply_async和map_async不会在完整队列中阻塞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆