如何在同一个池执行的任务中将任务添加到池中? [英] How to add tasks to a Pool in a task executed by the same Pool?

查看:31
本文介绍了如何在同一个池执行的任务中将任务添加到池中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个由 multiprocessing.Pool 执行的任务.如何允许此任务向执行它的 Pool 添加新任务?例如,

Suppose I have a task executed by a multiprocessing.Pool. How do I allow this task to add new tasks to the Pool executing it? For example,

def integers(pool, queue, n1, n2):
  print ("integers(%d)" % n)
  queue.put(n)
  pool.apply_async(integers, (pool, queue, n+1))  # crashes; can't pickle `pool`

def start():
  pool  = multiprocessing.Pool()
  queue = multiprocessing.Queue()
  integers(pool, queue, 1)
  while True:
    yield queue.get()

推荐答案

pickle Pool 是不可能的,因此如果您希望工作人员能够添加任务,您必须找到一种解决方法.

It's not possible to pickle a Pool, so you have to find a workaround if you want workers to be able to add tasks.

您可以使用特定的哨兵"返回值告诉主程序将新任务添加到:

You can use a particular "sentinel" return value that tells the main program to add new tasks to the Pool:

while True:
    ret_value = queue.get()
    if is_sentinel(ret_value):
        pool.apply_asynch(*get_arguments(ret_value))
    yield ret_value

只要返回值要求您向 Poolget_arguments 添加更多作业,is_sentinel 就会返回 True是一个能够获取要传递给 Pool 的参数的函数.

Where is_sentinel returns True whenever the return value requires you to add more jobs to the Pool and get_arguments is a function that is able to fetch the arguments to be passed to the Pool.

此类函数的最简单实现可能是:

The simplest implementation of such functions could be:

def is_sentinel(value):
    """Assume only sentinel values are tuples, or sequences."""
    return isinstance(value, tuple)
    # or, using duck-typing
    try:
        len(value)
    except TypeError:
        return False
    else:
        return True


def get_arguments(value):
    """Assume that the sentinel value *is* the tuple of arguments
    to pass to the Pool.
    """
    return value
    # or return value[1:] if you want to include a "normal" return value

传递给 apply_asynch 的函数在它想要添加新任务时返回一个 tuple(或一个序列),并且在这个如果它不提供任何返回值.添加提供返回值的可能性非常简单(例如:元组的第一个元素可能是正常"返回值).

Where the function passed to apply_asynch returns a tuple(or a sequence) only when it wants to add new tasks, and in this case it doesn't supply any return value. It's pretty simple to add the possibility of providing also a return value(for example: the first element of the tuple could be the "normal" return value).

另一种方法是使用第二个队列,工作人员可以在其中放置请求.在每次迭代时,您可以使用 get_nowait() 方法查看工作人员是否请求在队列中添加更多作业.

A different approach could be to use a second queue where the workers can put their requests. At each iteration you can use the get_nowait() method to see whether the workers requested to add more jobs on the queue.

您使用第一种方法的示例:

Your example using the first approach:

def is_sentinel(value):
    return isinstance(value, tuple)

def get_arguments(value):
    return value


def integers(queue, n1, n2):
    print("integers(%d)" % n1)
    queue.put(n1)
    if n1 < n2:
        queue.put((integers, (queue, n1+1, n2)))

def start():
    pool = multiprocessing.Pool()
    queue = multiprocessing.Queue()
    m = 0
    n = 100
    pool.apply_asynch(integers, (queue, m, n))
    while True:
        ret_val = queue.get()
        if is_sentinel(ret_val):
            pool.apply_asynch(*get_arguments(ret_val))
        else:
            yield ret_val

<小时>

您使用第二种方法的示例:


Your example using the second approach:

# needed for queue.Empty exception
import queue

def integers(out_queue, task_queue, n1, n2):
    print("integers(%d)" % n1)
    out_queue.put(n1)
    if n1 < n2:
        task_queue.put((n1+1, n2))


def start():
    pool = multiprocessing.Pool()
    out_queue = multiprocessing.Queue()
    task_queue = multiprocessing.Queue()
    task_queue.put((0, 100))
    while True:
        try:
            # may be safer to use a timeout...
            m, n = task_queue.get_nowait()
            pool.apply_asynch(integers, (out_queue, task_queue, m, n))
        except queue.Empty:
            # no task to perform
            pass
        yield out_queue.get()

这篇关于如何在同一个池执行的任务中将任务添加到池中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆