python pool apply_async和map_async不会在完整队列中阻塞 [英] python pool apply_async and map_async do not block on full queue
问题描述
我正在使用多处理模块来读取stdin中的文本行,以某种方式转换它们并将其写入数据库。以下是我的代码片段:
batch = []
pool = multiprocessing.Pool(20)
i = 0
为i,枚举中的内容(sys.stdin):
batch.append(content)
如果len(批处理)> = 10000:
pool.apply_async (insert,args =(batch,i + 1))
batch = []
pool.apply_async(insert,args =(batch,i))
pool.close()
pool.join()
现在,一切正常,直到我处理巨大的输入文件(数亿行),我管道进入我的python程序。在某些时候,当我的数据库变慢时,我看到内存已经满了。
在一些播放之后,结果是pool.apply_async以及pool.map_async从来没有阻止,以便要处理的呼叫的队列越来越大。
我的问题是什么正确的方法?我会期望一个可以设置的参数,一旦达到某个队列长度,就会阻塞pool.apply_async调用。 AFAIR在Java中可以给ThreadPoolExecutor一个具有固定长度的BlockingQueue用于此目的。
谢谢!
为了防止这样一来,这是我如何解决问题:我停止使用multiprocessing.Pool。这是我现在如何做:
#set插入数据库数据的并发进程数量
进程=多处理。 cpu_count()* 2
#setup批处理队列
queue = multiprocessing.Queue(processes * 2)
#start处理
(进程):multiprocessing.Process(target = insert,args =(queue,))。start()
#fill queue with batchs
batch = []
for i ,枚举中的内容(sys.stdin):
batch.append(content)
如果len(batch)> = 10000:
queue.put((batch,i + 1))
batch = []
如果批次:
queue.put((batch,i + 1))
#stop进程使用poison-pill
for _ in range(processes):queue.put((None,None))
打印全部完成。在插入方法中,
每个批处理的处理都包含在从队列中拉出的循环中收到药丸:
while True:
批次,结束= queue.get()
如果没有批次而不是结束:返回#poison药丸!完成!
[处理批次]
打印'工作完成'
I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.
After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.
What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.
Thanks!
Just in case some one ends up here, this is how I solved the problem: I stopped using multiprocessing.Pool. Here is how I do it now:
#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2
#setup batch queue
queue = multiprocessing.Queue(processes * 2)
#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start()
#fill queue with batches
batch=[]
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
queue.put((batch,i+1))
batch = []
if batch:
queue.put((batch,i+1))
#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))
print "all done."
in the insert method the processing of each batch is wrapped in a loop that pulls from the queue until it receives the poison pill:
while True:
batch, end = queue.get()
if not batch and not end: return #poison pill! complete!
[process the batch]
print 'worker done.'
这篇关于python pool apply_async和map_async不会在完整队列中阻塞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!