如何使用StdLib和Python 3在一定范围内并行化迭代? [英] How to parallelize iteration over a range, using StdLib and Python 3?

查看:73
本文介绍了如何使用StdLib和Python 3在一定范围内并行化迭代?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我几天来一直在寻找答案,但无济于事.我可能只是不了解其中漂浮的部分,并且multiprocessing模块上的Python文档相当大,我不清楚.

I've been searching for an answer on this now for days to no avail. I'm probably just not understanding the pieces that are floating around out there and the Python documentation on the multiprocessing module is rather large and not clear to me.

假设您有以下for循环:

Say you have the following for loop:

import timeit


numbers = []

start = timeit.default_timer()

for num in range(100000000):
    numbers.append(num)

end = timeit.default_timer()

print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

输出:

TIME: 23.965870224497916 seconds
SUM: 4999999950000000

在此示例中,您有一个4核处理器.是否有办法总共创建4个进程,每个进程都在单独的CPU内核上运行,并且完成速度大约快4倍,所以24s/4个进程=〜6秒?

For this example say you have a 4 core processor. Is there way to create 4 processes in total, where each process is running on a separate CPU core and finish roughly 4 times faster so 24s/4 processes = ~6 seconds?

以某种方式将for循环划分为4个相等的块,然后将这4个块添加到数字列表中以等于相同的总和?有这个stackoverflow线程:并行简单循环,但我不明白.谢谢大家.

Somehow divide the for loop up into 4 equal chunks and then have the 4 chunks added into the numbers list to equate the same sum? There was this stackoverflow thread: Parallel Simple For Loop but I don't get it. Thanks all.

推荐答案

是的,这是可行的.您的计算不依赖于中间结果,因此您可以轻松地将任务划分为多个块并将其分布在多个流程中.这就是所谓的

Yes, that is doable. Your calculation is not dependend on intermediate results, so you can easily divide the task into chunks and distribute it over multiple processes. It's what is called an

令人尴尬的并行问题.

这里唯一棘手的部分可能是,首先将范围分成相当相等的部分.理顺我的个人lib的两个函数来处理此问题:

The only tricky part here might be, to divide the range into fairly equal parts in the first place. Straight out my personal lib two functions to deal with this:

# mp_utils.py

from itertools import accumulate

def calc_batch_sizes(n_tasks: int, n_workers: int) -> list:
    """Divide `n_tasks` optimally between n_workers to get batch_sizes.

    Guarantees batch sizes won't differ for more than 1.

    Example:
    # >>>calc_batch_sizes(23, 4)
    # Out: [6, 6, 6, 5]

    In case you're going to use numpy anyway, use np.array_split:
    [len(a) for a in np.array_split(np.arange(23), 4)]
    # Out: [6, 6, 6, 5]
    """
    x = int(n_tasks / n_workers)
    y = n_tasks % n_workers
    batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y)

    return batch_sizes


def build_batch_ranges(batch_sizes: list) -> list:
    """Build batch_ranges from list of batch_sizes.

    Example:
    # batch_sizes [6, 6, 6, 5]
    # >>>build_batch_ranges(batch_sizes)
    # Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)]
    """
    upper_bounds = [*accumulate(batch_sizes)]
    lower_bounds = [0] + upper_bounds[:-1]
    batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)]

    return batch_ranges

然后您的主脚本将如下所示:

Then your main script would look like this:

import time
from multiprocessing import Pool
from mp_utils import calc_batch_sizes, build_batch_ranges


def target_foo(batch_range):
    return sum(batch_range)  # ~ 6x faster than target_foo1


def target_foo1(batch_range):
    numbers = []
    for num in batch_range:
        numbers.append(num)
    return sum(numbers)


if __name__ == '__main__':

    N = 100000000
    N_CORES = 4

    batch_sizes = calc_batch_sizes(N, n_workers=N_CORES)
    batch_ranges = build_batch_ranges(batch_sizes)

    start = time.perf_counter()
    with Pool(N_CORES) as pool:
        result = pool.map(target_foo, batch_ranges)
        r_sum = sum(result)
    print(r_sum)
    print(f'elapsed: {time.perf_counter() - start:.2f} s')

请注意,我也将for循环切换为range对象的简单总和,因为它提供了更好的性能.如果您无法在实际应用中执行此操作,则列表理解仍将比示例中的手动填充列表快60%左右.

Note that I also switched your for-loop for a simple sum over the range object, since it offers much better performance. If you cant do this in your real app, a list comprehension would still be ~60% faster than filling your list manually like in your example.

示例输出:

4999999950000000
elapsed: 0.51 s

Process finished with exit code 0

这篇关于如何使用StdLib和Python 3在一定范围内并行化迭代?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆