如何添加可用于多处理队列的进程池 [英] How to add a pool of processes available for a multiprocessing queue
问题描述
I am following a preceding question here: how to add more items to a multiprocessing queue while script in motion
我现在正在使用的代码:
the code I am working with now:
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(proc_name, self.name))
def worker(q):
while True:
obj = q.get()
if obj is None:
break
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
queue.put(MyFancyClass('Frankie'))
# print(queue.qsize())
queue.put(None)
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
现在,队列中有两个项目.如果我将两行替换为例如50个项目的清单....如何启动POOL以允许许多可用的过程.例如:
Right now, there's two items in the queue. if I replace the two lines with a list of, say 50 items....How do I initiate a POOL to allow a number of processes available. for example:
p = multiprocessing.Pool(processes=4)
那去哪儿了?我希望能够一次运行多个项目,特别是如果项目运行了一段时间. 谢谢!
where does that go? I'd like to be able run multiple items at once, especially if the items run for a bit. Thanks!
推荐答案
通常,您要么使用Pool
或 Process
(es)加Queue
s.两者混用是一种误用. Pool
已经在后台使用了Queue
(或类似的机制).
As a rule, you either use Pool
or Process
(es) plus Queue
s. Mixing both is a misuse; the Pool
already uses Queue
s (or a similar mechanism) behind the scenes.
如果要使用Pool
进行此操作,请将代码更改为(将代码移至main
函数以实现性能并比在全局范围内运行更好地清理资源):
If you want to do this with a Pool
, change your code to (moving code to main
function for performance and better resource cleanup than running in global scope):
def main():
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances here
with multiprocessing.Pool(processes=4) as p:
# Submit all the work
futures = [p.apply_async(fancy.do_something) for fancy in myfancyclasses]
# Done submitting, let workers exit as they run out of work
p.close()
# Wait until all the work is finished
for f in futures:
f.wait()
if __name__ == '__main__':
main()
使用Pool
的.*map*
方法,例如以纯度为代价,可以进一步简化此过程.为了最大程度地减少内存使用,将main
重新定义为:
This could be simplified further at the expense of purity, with the .*map*
methods of Pool
, e.g. to minimize memory usage redefine main
as:
def main():
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances here
with multiprocessing.Pool(processes=4) as p:
# No return value, so we ignore it, but we need to run out the result
# or the work won't be done
for _ in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
pass
是的,从技术上讲,这两种方法在需要序列化未使用的返回值方面的开销都会稍高一些,因此请将其返回给父进程.但是在实践中,此开销非常低(因为您的函数没有return
,因此返回的是None
,几乎不会序列化为任何东西).这种方法的优点是,要打印到屏幕上,通常不想要从子进程中进行打印(因为它们最终将交错输出),因此您可以替换print
和return
一起让父母来做工作,例如:
Yes, technically either approach has a slightly higher overhead in terms of needing to serialize the return value you're not using so give it back to the parent process. But in practice, this cost is pretty low (since your function has no return
, it's returning None
, which serializes to almost nothing). An advantage to this approach is that for printing to the screen, you generally don't want to do it from the child processes (since they'll end up interleaving output), and you can replace the print
ing with return
s to let the parent do the work, e.g.:
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
# Changed from print to return
return 'Doing something fancy in {} for {}!'.format(proc_name, self.name)
def main():
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances here
with multiprocessing.Pool(processes=4) as p:
# Using the return value now to avoid interleaved output
for res in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
print(res)
if __name__ == '__main__':
main()
请注意,所有这些解决方案如何消除编写自己的worker
函数或手动管理Queue
的需要,因为Pool
会帮您忙.
Note how all of these solutions remove the need to write your own worker
function, or manually manage Queue
s, because Pool
s do that grunt work for you.
使用concurrent.futures
的替代方法可以在结果可用时有效地对其进行处理,同时允许您选择随行提交新工作(基于结果或基于外部信息):
Alternate approach using concurrent.futures
to efficiently process results as they become available, while allowing you to choose to submit new work (either based on the results, or based on external information) as you go:
import concurrent.futures
from concurrent.futures import FIRST_COMPLETED
def main():
allow_new_work = True # Set to False to indicate we'll no longer allow new work
myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your initial MyFancyClass instances here
with concurrent.futures.ProcessPoolExecutor() as executor:
remaining_futures = {executor.submit(fancy.do_something)
for fancy in myfancyclasses}
while remaining_futures:
done, remaining_futures = concurrent.futures.wait(remaining_futures,
return_when=FIRST_COMPLETED)
for fut in done:
result = fut.result()
# Do stuff with result, maybe submit new work in response
if allow_new_work:
if should_stop_checking_for_new_work():
allow_new_work = False
# Let the workers exit when all remaining tasks done,
# and reject submitting more work from now on
executor.shutdown(wait=False)
elif has_more_work():
# Assumed to return collection of new MyFancyClass instances
new_fanciness = get_more_fanciness()
remaining_futures |= {executor.submit(fancy.do_something)
for fancy in new_fanciness}
myfancyclasses.extend(new_fanciness)
这篇关于如何添加可用于多处理队列的进程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!