为什么在multiprocessing.Pool().apply_async()中使用了多个工人? [英] why is more than one worker used in `multiprocessing.Pool().apply_async()`?

查看:134
本文介绍了为什么在multiprocessing.Pool().apply_async()中使用了多个工人?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

来自multiprocessing.Pool 文档:

apply_async(func ...): apply()方法的一种变体,它返回结果对象. ...

进一步阅读...

apply(func[, args[, kwds]]):使用参数args和关键字参数kwds调用func.它会阻塞,直到结果准备好为止.给定此块,apply_async()更适合于并行执行工作. 此外,func仅在池中的一个工作线程中执行.

最后一个粗线建议仅使用池中的一个工人.我发现这仅在某些条件下是正确的.

给予

这是在三种类似情况下执行Pool.apply_async()的代码.在所有情况下,都会打印进程ID.

import os
import time
import multiprocessing as mp


def blocking_func(x, delay=0.1):
    """Return a squared argument after some delay."""
    time.sleep(delay)                                  # toggle comment here
    return x*x, os.getpid()


def apply_async():
    """Return a list applying func to one process with a callback."""
    pool = mp.Pool()
    # Case 1: From the docs
    results = [pool.apply_async(os.getpid, ()) for _ in range(10)]
    results = [res.get(timeout=1) for res in results]
    print("Docs    :", results)

    # Case 2: With delay
    results = [pool.apply_async(blocking_func, args=(i,)) for i in range(10)]
    results = [res.get(timeout=1)[1] for res in results]
    print("Delay   :", results)

    # Case 3: Without delay
    results = [pool.apply_async(blocking_func, args=(i, 0)) for i in range(10)]
    results = [res.get(timeout=1)[1] for res in results]
    print("No delay:", results)

    pool.close()
    pool.join()


if __name__ == '__main__':
    apply_async()

结果

docs 中的示例(情况1)确认只有一名工人在运行.在接下来的情况下,我们通过应用blocking_func扩展该示例,该示例会稍有延迟.

注释blocking_func()中的time.sleep()行会使所有情况一致.

# Time commented
# 1. Docs    : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 2. Delay   : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 3. No delay: [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]

每次调用apply_async()都会创建一个新的进程池,这就是为什么新的进程ID与后者不同的原因.

# Time uncommented
# 1. Docs    : [6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780]
# 2. Delay   : [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]
# 3. No delay: [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]

但是,当取消注释time.sleep()时,即使延迟为零,也会使用多个工作线程.

简而言之,我们没有期望像案例1那样有一名工人,但是就像案例2和3一样,我们会有多名工人.

问题

尽管我希望Pool().apply_async()仅使用一个工作程序,但是当取消注释time.sleep()时,为什么要使用一个以上的工作程序?封锁是否还会影响applyapply_async使用的工人数量?

注意:先前的相关问题问为什么只雇用一名工人?"这个问题提出了相反的问题-为什么不是 仅使用一个工人?"我在Windows计算机上使用2个内核.

解决方案

当真正有<强>十个独立通话.调用任何池方法都是一种工作".一项工作通常可以导致分配一个或多个任务. apply-方法始终只能在后台执行单个任务.任务是一个不可分割的工作单元,将由一个随机的池工作人员整体接收.

只有一个共享的inqueue,所有工作人员都受够了.哪个空闲工作者将从等待到get()醒来,该队列中的任务取决于操作系统.情况1的结果熵仍然有些令人惊讶,并且可能非常幸运,至少除非您确认只有两个核心.

是的,您对运行的观察还受任务所需的计算时间的影响,因为线程(进程中的调度执行单元)通常使用时间切片策略进行调度(例如Windows约为20ms). /p>

Problem

From the multiprocessing.Pool docs:

apply_async(func ...): A variant of the apply() method which returns a result object. ...

Reading further ...

apply(func[, args[, kwds]]): Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

The last bold line suggests only one worker from a pool is used. I find this is only true under certain conditions.

Given

Here is code that executes Pool.apply_async() in three similar cases. In all cases, the process id is printed.

import os
import time
import multiprocessing as mp


def blocking_func(x, delay=0.1):
    """Return a squared argument after some delay."""
    time.sleep(delay)                                  # toggle comment here
    return x*x, os.getpid()


def apply_async():
    """Return a list applying func to one process with a callback."""
    pool = mp.Pool()
    # Case 1: From the docs
    results = [pool.apply_async(os.getpid, ()) for _ in range(10)]
    results = [res.get(timeout=1) for res in results]
    print("Docs    :", results)

    # Case 2: With delay
    results = [pool.apply_async(blocking_func, args=(i,)) for i in range(10)]
    results = [res.get(timeout=1)[1] for res in results]
    print("Delay   :", results)

    # Case 3: Without delay
    results = [pool.apply_async(blocking_func, args=(i, 0)) for i in range(10)]
    results = [res.get(timeout=1)[1] for res in results]
    print("No delay:", results)

    pool.close()
    pool.join()


if __name__ == '__main__':
    apply_async()

Results

The example from the docs (Case 1) confirms only one worker is run. We extend this example in the next cases by applying blocking_func, which blocks with some delay.

Commenting the time.sleep() line in blocking_func() brings all cases in agreement.

# Time commented
# 1. Docs    : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 2. Delay   : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 3. No delay: [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]

Each call to apply_async() creates a new process pool, which is why new process ids differ from the latter.

# Time uncommented
# 1. Docs    : [6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780]
# 2. Delay   : [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]
# 3. No delay: [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]

However when time.sleep() is uncommented, even with zero delay, more than one worker is used.

In short, uncommented we expect one worker as in Case 1, but we get multiple workers as in Cases 2 and 3.

Question

Although I expect only one worker to be used by Pool().apply_async(), why are more than one used when time.sleep() is uncommented? Should blocking even effect the number of workers used by apply or apply_async?

Note: previous, related questions ask "why is only one worker used?" This question asks the opposite - "why isn't only one worker used?" I'm using 2 cores on a Windows machine.

解决方案

Your confusion seems to come from thinking [pool.apply_async(...) for i in range(10)] is one call, when there are really ten independent calls. A call to any pool-method is a "job". A job generally can lead to one or multiple tasks being distributed. apply-methods always produce only a single task under the hood. A task is an indivisible unit of work which will be received as a whole by a random pool-worker.

There's only one shared inqueue, all workers are fed over. Which idling worker will be woken up from waiting to get() a task from that queue is up to the OS. Your result-entropy for case 1 is still somewhat surprising and probably very lucky, at least unless you confirm you have only two cores.

And yes, your observation for this run is also influenced by the computation time needed for a task, since threads (the scheduled execution unit within a process) usually are scheduled with time slicing policies (e.g. ~20ms for Windows).

这篇关于为什么在multiprocessing.Pool().apply_async()中使用了多个工人?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆