如何在同一个池执行的任务中将任务添加到池中? [英] How to add tasks to a Pool in a task executed by the same Pool?
问题描述
假设我有一个由 multiprocessing.Pool
执行的任务.如何允许此任务向执行它的 Pool
添加新任务?例如,
Suppose I have a task executed by a multiprocessing.Pool
. How do I allow this task to add new tasks to the Pool
executing it? For example,
def integers(pool, queue, n1, n2):
print ("integers(%d)" % n)
queue.put(n)
pool.apply_async(integers, (pool, queue, n+1)) # crashes; can't pickle `pool`
def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
integers(pool, queue, 1)
while True:
yield queue.get()
推荐答案
pickle Pool
是不可能的,因此如果您希望工作人员能够添加任务,您必须找到一种解决方法.
It's not possible to pickle a Pool
, so you have to find a workaround if you want workers to be able to add tasks.
您可以使用特定的哨兵"返回值告诉主程序将新任务添加到池
:
You can use a particular "sentinel" return value that tells the main program to add new tasks to the Pool
:
while True:
ret_value = queue.get()
if is_sentinel(ret_value):
pool.apply_asynch(*get_arguments(ret_value))
yield ret_value
只要返回值要求您向 Pool
和 get_arguments
添加更多作业,is_sentinel
就会返回 True
是一个能够获取要传递给 Pool
的参数的函数.
Where is_sentinel
returns True
whenever the return value requires you to add more jobs to the Pool
and get_arguments
is a function that is able to fetch the arguments to be passed to the Pool
.
此类函数的最简单实现可能是:
The simplest implementation of such functions could be:
def is_sentinel(value):
"""Assume only sentinel values are tuples, or sequences."""
return isinstance(value, tuple)
# or, using duck-typing
try:
len(value)
except TypeError:
return False
else:
return True
def get_arguments(value):
"""Assume that the sentinel value *is* the tuple of arguments
to pass to the Pool.
"""
return value
# or return value[1:] if you want to include a "normal" return value
传递给 apply_asynch
的函数在它想要添加新任务时返回一个 tuple
(或一个序列)仅,并且在这个如果它不提供任何返回值.添加提供返回值的可能性非常简单(例如:元组的第一个元素可能是正常"返回值).
Where the function passed to apply_asynch
returns a tuple
(or a sequence) only when it wants to add new tasks, and in this case it doesn't supply any return value.
It's pretty simple to add the possibility of providing also a return value(for example: the first element of the tuple could be the "normal" return value).
另一种方法是使用第二个队列,工作人员可以在其中放置请求.在每次迭代时,您可以使用 get_nowait()
方法查看工作人员是否请求在队列中添加更多作业.
A different approach could be to use a second queue where the workers can put their requests. At each iteration you can use the get_nowait()
method to see whether the workers requested to add more jobs on the queue.
您使用第一种方法的示例:
Your example using the first approach:
def is_sentinel(value):
return isinstance(value, tuple)
def get_arguments(value):
return value
def integers(queue, n1, n2):
print("integers(%d)" % n1)
queue.put(n1)
if n1 < n2:
queue.put((integers, (queue, n1+1, n2)))
def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
m = 0
n = 100
pool.apply_asynch(integers, (queue, m, n))
while True:
ret_val = queue.get()
if is_sentinel(ret_val):
pool.apply_asynch(*get_arguments(ret_val))
else:
yield ret_val
<小时>
您使用第二种方法的示例:
Your example using the second approach:
# needed for queue.Empty exception
import queue
def integers(out_queue, task_queue, n1, n2):
print("integers(%d)" % n1)
out_queue.put(n1)
if n1 < n2:
task_queue.put((n1+1, n2))
def start():
pool = multiprocessing.Pool()
out_queue = multiprocessing.Queue()
task_queue = multiprocessing.Queue()
task_queue.put((0, 100))
while True:
try:
# may be safer to use a timeout...
m, n = task_queue.get_nowait()
pool.apply_asynch(integers, (out_queue, task_queue, m, n))
except queue.Empty:
# no task to perform
pass
yield out_queue.get()
这篇关于如何在同一个池执行的任务中将任务添加到池中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!