对面向队列的功能使用多处理后,没有性能提升 [英] No performance gain after using multiprocessing for a queue-oriented function

查看:49
本文介绍了对面向队列的功能使用多处理后,没有性能提升的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要优化的实际代码过于复杂,无法在此处包含,因此这里是一个简化的示例:

The real code I want to optimize is too complicated to be included here, so here is a simplified example:

def enumerate_paths(n, k):
    """
    John want to go up a flight of stairs that has N steps. He can take
    up to K steps each time. This function enumerate all different ways
    he can go up this flight of stairs.
    """
    paths = []
    to_analyze = [(0,)]

    while to_analyze:
        path = to_analyze.pop()
        last_step = path[-1]

        if last_step >= n:
            # John has reach the top
            paths.append(path)
            continue

        for i in range(1, k + 1):
            # possible paths from this point
            extended_path = path + (last_step + i,)
            to_analyze.append(extended_path)

    return paths

输出看起来像这样

>>> enumerate_paths(3, 2)
[(0, 2, 4), (0, 2, 3), (0, 1, 3), (0, 1, 2, 4), (0, 1, 2, 3)]

您可能会发现结果令人困惑,因此这里有一个解释.例如,(0, 1, 2, 4)表示John可以按时间顺序将脚踩到第一,第二和第四步,最后他在第4步停止,因为他只需要向上走3步即可.

You may find the result confusing, so here is an explanation. For example, (0, 1, 2, 4) means John can visit place his foot on the first, second and fourth step chronological, and finally he stops at step 4 because he only need to go up 3 steps.

我尝试将multiprocessing合并到此代码段中,但没有发现性能提升,甚至没有提升!

I tried to incorporate multiprocessing into this snippet, but observed no performance gain, not even a little!

import multiprocessing

def enumerate_paths_worker(n, k, queue):
    paths = []

    while not queue.empty():
        path = queue.get()
        last_step = path[-1]

        if last_step >= n:
            # John has reach the top
            paths.append(path)
            continue

        for i in range(1, k + 1):
            # possible paths from this point
            extended_path = path + (last_step + i,)
            queue.put(extended_path)

    return paths


def enumerate_paths(n, k):
    pool = multiprocessing.Pool()
    manager = multiprocessing.Manager()
    queue = manager.Queue()

    path_init = (0,)
    queue.put(path_init)
    apply_result = pool.apply_async(enumerate_paths_worker, (n, k, queue))

    return apply_result.get()

Python列表to_analysis的行为就像一个任务队列,并且该队列中的每个项目都可以单独处理,因此我认为该功能具有通过采用多线程/处理进行优化的潜力.另外,请注意,项目顺序无关紧要.实际上,在对其进行优化时,您可以返回一个Python集,一个Numpy数组或一个Pandas数据框,只要它们表示相同的路径集即可.

The Python list to_analysis acts just like a task queue, and each item in this queue can be processed separately, so I think this function has the potential to be optimized by employing multi-threading/processing. Also, please note that the order of items doesn't matter. In fact, when optimizing it, you may return a Python set, a Numpy array, or a Pandas data frame, as long as they represent the same set of paths.

奖励问题:通过使用像Numpy,Pandas或Scipy这样的科学软件包来完成这样的任务,我可以获得多少性能?

Bonus Question: How much performance can I gain by using scientific packages like Numpy, Pandas or Scipy for a task like this?

推荐答案

TL; DR

如果您的实际算法所涉及的计算不如您在示例中展示的那样昂贵,那么多处理的通信开销将占主导地位,并使您的计算所需的时间比顺序执行的时间长很多倍.

If your real algorithm doesn't involve costlier calculations than you showed us in your example, the communication overhead for multiprocessing will dominate and make your computation take many times longer than sequential execution.

您尝试使用apply_async实际上只使用您池中的一个工作程序,这就是为什么您看不到差异的原因. apply_async只是按设计一次养活一名工人.此外,如果您的工作人员需要共享中间结果,仅将串行版本传递到池中是不够的,因此您将必须修改目标函数以启用该功能.

Your attempt with apply_async actually just uses one worker of your pool, that's why you don't see a difference. apply_async is just feeding one worker at once by design. Futher it's not enough to just pass the serial version into the pool if your workers need to share intermediate results so you will have to modify your target function to enable that.

但是正如在引言中已经说过的那样,如果您的计算量足够大,足以抵消进程间通信(和进程创建)的开销,那么您将只能从多处理中受益.

But as already said in the introduction, your computation will only benefit from multiprocessing if it's heavy enough to earn back the overhead of inter-process communication (and process creation).

我下面针对一般问题的解决方案将JoinableQueue与用于流程终止的哨兵值结合使用,以同步工作流程.我添加了一个函数busy_foo以使计算更重,以显示多处理具有其优势的情况.

My solution below for the general problem uses JoinableQueue in combination with a sentinel value for process termination, to synchronize the workflow. I'm adding a function busy_foo to make the computation heavier to show a case where multiprocessing has it's benefits.

from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue
import time

SENTINEL = 'SENTINEL'

def busy_foo(x = 10e6):
    for _ in range(int(x)):
        x -= 1


def enumerate_paths(q_analyze, q_result, n, k):
    """
    John want to go up a flight of stairs that has N steps. He can take
    up to K steps each time. This function enumerate all different ways
    he can go up this flight of stairs.
    """
    for path in iter(q_analyze.get, SENTINEL):
        last_step = path[-1]

        if last_step >= n:
            busy_foo()
            # John has reach the top
            q_result.put(path)
            q_analyze.task_done()
            continue
        else:
            busy_foo()
            for i in range(1, k + 1):
                # possible paths from this point
                extended_path = path + (last_step + i,)
                q_analyze.put(extended_path)
            q_analyze.task_done()


if __name__ == '__main__':

    N_CORES = 4

    N = 6
    K = 2

    start = time.perf_counter()
    q_analyze = Queue()
    q_result = Queue()

    q_analyze.put((0,))

    pool = []
    for _ in range(N_CORES):
        pool.append(
            Process(target=enumerate_paths, args=(q_analyze, q_result, N, K))
        )

    for p in pool:
        p.start()

    q_analyze.join() # block until everything is processed

    for p in pool:
        q_analyze.put(SENTINEL)  # let the processes exit gracefully

    results = []
    while not q_result.empty():
        results.append(q_result.get())

    for p in pool:
        p.join()

    print(f'elapsed: {time.perf_counter() - start: .2f} s')

结果

如果我在上面的代码中注释了busy_foo,则需要N = 30,K = 2(2178309个结果):

If I'm using the code above with busy_foo commented out, it takes for N=30, K=2 (2178309 results):

  • 〜208s N_CORES = 4
  • 2.78s 顺序原稿
  • ~208s N_CORES=4
  • 2.78s sequential original

酸洗和酸洗,针对锁运行的线程等造成了巨大的差异.

Pickling and Unpickling, threads running against locks etc, account for this huge difference.

现在同时启用了busy_foo和N = 6,K = 2(21个结果),则需要:

Now with busy_foo enabled for both and N=6, K=2 (21 results) it takes:

  • 6.45s N_CORES = 4
  • 30.46s 顺序原稿
  • 6.45s N_CORES=4
  • 30.46s sequential original

这里的计算量足够大,可以收回开销.

Here the computation was heavy enough to allow the overhead to be earned back.

脾气暴躁

Numpy可以多次加速矢量化操作,但是您可能会在此操作上看到numpy的性能损失. Numpy使用连续的内存块作为其数组.更改数组大小时,与使用python列表不同,必须重新重建整个数组.

Numpy can speed up vectorized operations many times but you likely would see performance penalties with numpy on this one. Numpy uses contiguous blocks of memory for it's arrays. When you change the array-size the whole array would have to be rebuild again, unlike using python lists.

这篇关于对面向队列的功能使用多处理后,没有性能提升的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆