Python多重处理:了解"chunksize"背后的逻辑 [英] Python multiprocessing: understanding logic behind `chunksize`

查看:193
本文介绍了Python多重处理:了解"chunksize"背后的逻辑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

什么因素决定了像multiprocessing.Pool.map()这样的方法的最佳chunksize参数? .map()方法似乎对其默认的块大小使用了任意启发式(如下所述);是什么促使了这一选择,是否有基于某些特定情况/设置的更周到的方法?

示例-说我是

我天真的想法是给24名工人中的每人一个相等大小的块,即15_000_000 / 24或625,000.大块应该在充分利用所有工人的同时减少营业额/间接费用.但这似乎遗漏了给每个工人大批量采购的一些潜在弊端.这是一张不完整的图片吗?我想念什么?


我的问题的一部分源于if chunksize=None的默认逻辑:.map().starmap()都调用 ,其中包括将可迭代的长度除以pool._pool中的工人数量.

>

最后,还有代码段代码段.imap()上的Python文档中的>,这进一步激发了我的好奇心:

chunksize参数与map()使用的参数相同 方法.对于很长的可迭代项,使用chunksize的较大值可以 与使用默认值1相比,完成任务的速度快得多.


有用的相关答案,但是有点太高级了: Python多重处理:为什么大的块大小会更慢?.

简短回答

Pool的chunksize-algorithm是一种启发式方法.它为您试图塞入Pool方法中的所有可想象的问题场景提供了一个简单的解决方案.因此,无法针对任何特定的方案对其进行优化.

与天真的方法相比,该算法将可迭代对象任意划分为大约四倍的块.更多的块意味着更多的开销,但是增加了调度灵活性.该答案将如何显示,平均导致更高的工人利用率,但没有保证每种情况下的总体计算时间都较短.

您可能会想,很高兴知道,但是知道这如何帮助我解决具体的多处理问题?"好吧,事实并非如此.更为诚实的简短答案是:没有简短答案",多重处理很复杂"和取决于".观察到的症状可能具有不同的根源,即使对于类似的情况也是如此.

此答案试图为您提供基本概念,以帮助您更清晰地了解Pool的调度黑匣子.它还尝试为您提供一些基本的工具,以帮助您识别和避免潜在的悬崖,只要它们与块大小有关.


目录

第一部分

  1. 定义
  2. 并行化目标
  3. 并行化方案
  4. 整形风险> 1
  5. 池的块大小算法
  6. 量化算法效率

    6.1模型

    6.2并行计划

    6.3效率

    6.3.1绝对分配效率(ADE)

    6.3.2相对分配效率(RDE)

第二部分

  1. 朴素vs. Pool的Chunksize算法
  2. 真实性检查
  3. 结论

有必要先阐明一些重要术语.


1.定义


大块

这里的一个块是池方法调用中指定的iterable参数的一部分.该答案的主题是如何计算块大小以及它可以产生什么影响.


任务

下图中可以看到任务在工作进程中的物理表示形式.

该图显示了对pool.map()的示例调用,该示例沿代码行显示,取自multiprocessing.pool.worker函数,其中从inqueue读取的任务被解压缩. worker是pool-worker-process的MainThread中的基础主要功能.池方法中指定的func参数仅与worker函数内的func变量匹配,用于apply_async之类的单调用方法,以及与imapchunksize=1相同的方法.对于具有chunksize参数的其余存储池方法,处理功能func将是映射器功能(mapstarstarmapstar).此函数将用户指定的func参数映射到可迭代的传输块的每个元素上(->"map-tasks").这花费的时间将任务定义为工作单元.


Taskel

虽然内的代码将一个块的整个处理中使用的任务"一词与multiprocessing.pool中的代码相匹配,但没有迹象表明单次调用到用户指定的func,其中一个 应当引用块中的元素作为自变量.为避免命名冲突引起的混乱(请考虑Pool的__init__方法的maxtasksperchild-参数),此答案将指向 任务中的单个工作单元为 taskel .

任务(来自任务+要素)是任务中最小的工作单元. 它是用Pool方法的func-参数指定的函数的单次执行,该函数使用从传输的单个元素获得的参数进行调用. 任务chunksize 任务面板组成.


并行化开销(PO)

PO 由Python内部的开销和进程间通信(IPC)的开销组成. Python中每个任务的开销包含打包和解压缩任务及其结果所需的代码. IPC-overhead带有必要的线程同步以及在不同地址空间之间的数据复制(需要两个复制步骤:父->队列->子). IPC开销的大小取决于操作系统,硬件和数据大小,这使得难以概括影响.


2.并行化目标

使用多重处理时,我们的总体目标(显然)是最大程度地减少所有任务的总处理时间.为了达到这个总体目标,我们的技术目标需要优化硬件资源的利用.

实现技术目标的一些重要子目标是:

  • 最小化并行化开销(最著名但并非唯一: IPC )
  • 所有CPU核心的高利用率
  • 限制内存使用量以防止操作系统过度分页(破坏)

首先,任务必须在计算上足够繁重(繁重),以赚回我们必须为并行化支付的订单. PO的相关性随每个任务的绝对计算时间的增加而降低.或者,换句话说,每个任务的绝对计算时间(em)越大,则减少PO的需求就越不相关.如果您的计算将花费每个任务小时数,则IPC开销相比而言可以忽略不计.这里的主要关注点是在分配所有任务之后防止空闲的工作进程.保持所有内核都处于加载状态,这意味着我们要尽可能地并行化.


3.并行化方案

什么因素决定了multiprocessing.Pool.map()之类的方法的最佳块大小参数

有问题的主要因素是整个单个任务板上的计算时间可能会有所不同.确切地说,最佳块大小的选择取决于变异系数( CV )来计算每个任务的计算时间.

从这种变化的程度来看,规模上的两个极端情况是:

  1. 所有Taskel需要完全相同的计算时间.
  2. 任务组可能需要几秒钟或几天才能完成.

为了获得更好的记忆力,我将这些场景称为:

  1. 密集场景
  2. 广泛场景


密集场景

密集场景中,最好一次分发所有任务,以将必要的IPC和上下文切换保持在最低水平.这意味着我们只想创建尽可能多的块,以及尽可能多的工作进程.如上所述,PO的权重随着每个任务的计算时间缩短而增加.

为了获得最大的吞吐量,我们还希望所有工作进程都忙,直到处理完所有任务为止(没有空闲的工作进程).为了实现这一目标,分布式块的大小应相等或相近.


广泛场景

广泛方案的主要示例是一个优化问题,结果要么快速收敛,要么计算可能要花费数小时甚至数天.通常,在这种情况下,无法预料任务将包含轻任务组"和重任务组"的混合,因此不建议一次在任务批中分配太多任务组.一次分配尽可能少的任务,意味着增加调度灵活性.这是达到我们所有内核的高利用率子目标所必需的.

如果默认情况下,Pool方法将针对密集场景进行完全优化,则它们将为靠近广域场景的每个问题逐步创建次最佳时序.


4.块大小> 1

的风险

考虑以下广泛场景 -iterable的简化伪代码示例,我们希望将其传递给池方法:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

代替实际值,我们假装以秒为单位查看所需的计算时间,为简单起见,仅计算1分钟或1天. 我们假设该池具有四个工作进程(在四个内核上),并且chunksize设置为2.因为将保留订单,所以发送给工人的块将是这些:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

由于我们有足够的工作人员,并且计算时间足够长,因此可以说,每个工作人员进程首先都会获得一个要处理的块. (对于快速完成的任务,不一定是这种情况).进一步我们可以说,整个处理将花费大约86400 + 60秒,因为在这种人工场景中,这是一个块的最高总计算时间,并且我们仅分配块一次.

现在考虑这个可迭代的对象,与之前的可迭代对象相比,只有一个元素切换其位置:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

...以及相应的块:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

运气不好,我们可迭代的排序几乎使我们的总处理时间翻了一番(86400 + 86400)!收到恶性(86400,86400)大块的工人正在阻止任务中的第二个繁重任务,将其分配给已经完成(60,60)大块的空转工人之一.如果设置chunksize=1,显然我们不会冒这样令人不快的结果的风险.

这是更大的块大小的风险.有了更大的块大小,我们就可以牺牲较少的开销来交换调度灵活性,而在上述情况下,这是一个糟糕的交易.

我们将在 6章中看到的方式.量化算法效率,更大的块大小也可能导致密集场景的效果欠佳.


5.池的块大小算法

在下面,您会在源代码中找到该算法的稍微修改后的版本.如您所见,我切除了下部并将其包装到一个用于在外部计算chunksize参数的函数中.我还用factor参数替换了4,并将len()调用外包了.

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

为确保所有人都在同一页面上,这是divmod的作用:

divmod(x, y)是返回(x//y, x%y)的内置函数. x // y是楼层划分,从x / y返回向下舍入的商,而 x % y是从x / y返回余数的模运算. 因此例如divmod(10, 3)返回(3, 1).

现在,当您查看chunksize, extra = divmod(len_iterable, n_workers * 4)时,您会注意到n_workers这是x / y中的除数y,然后乘以4,随后无需通过if extra: chunksize +=1进行进一步调整,即可得出初始值. (len_iterable >= n_workers * 4)块大小至少要比其他大小小四倍.

要查看乘以4的乘积对中间块大小结果的影响,请考虑以下功能:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

上面的函数计算Pool的块大小算法(cs_pool1)的原始块大小(cs_naive)和第一步块大小,以及完整的Pool-算法(cs_pool2)的块大小.进一步,它计算 实际因子 rf_pool1 = cs_naive / cs_pool1rf_pool2 = cs_naive / cs_pool2,它们告诉我们天真计算出的块大小比Pool内部版本大多少倍.

在下面您将看到使用此函数的输出创建的两个图形.左图仅显示了n_workers=4的块大小,直到500的可迭代长度为止.右图显示rf_pool1的值.对于可迭代长度16,实际因子变为>=4(对于len_iterable >= n_workers * 4),对于可迭代长度28-31,最大值为7.与原始因子4相比存在很大偏差,该算法收敛到更长的可迭代项.这里的较长"是相对的,取决于指定的工人数.

记住块大小cs_pool1仍然缺少extra调整,而完整算法中cs_pool2中包含的divmod其余部分.

算法继续:

if extra:
    chunksize += 1

现在,如果存在 个余数(来自divmod操作的extra),则将块大小增加1显然无法解决所有任务.毕竟,如果可以的话,就不会有余数.

在下图中如何看到,"额外处理"具有的作用是,rf_pool2实际因子现在收敛到了4从下面的 4开始,偏差稍微更平滑了. n_workers=4len_iterable=500的标准差从rf_pool10.5233降至rf_pool20.4115.

最终,将chunksize增加1的效果是,最后传输的任务的大小仅为len_iterable % chunksize or chunksize.

对于生成的块数(n_chunks ). 对于足够长的可迭代对象,Pool完成的chunksize-algorithm(下图中的n_pool2)将把块的数量稳定在n_chunks == n_workers * 4处. 相反,随着迭代次数的增加,朴素的算法(经过最初的打p)在n_chunks == n_workersn_chunks == n_workers + 1之间保持交替.

在下面,您将找到Pool的两个增强的信息功能和朴素的chunksize-algorithm.下一章将需要这些功能的输出.

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

请不要为calc_naive_chunksize_info可能令人意想不到的外观所迷惑. divmod中的extra不用于计算块大小.

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )


6.量化算法效率

现在,在我们看到Pool的chunksize-algorithm的输出与朴素算法的输出相比看起来有何不同之后...

  • 如何判断Pool的方法是否确实改善了?
  • 某物到底是什么?

如上一章所示,对于更长的可迭代对象(大量任务),Pool的chunksize-algorithm 大约将可迭代对象划分为四倍的 块天真的方法.较小的块意味着更多的任务,而更多的任务则意味着更多的并行化开销(PO),必须权衡成本,以提高调度灵活性(请参阅块大小风险> 1" ).

出于显而易见的原因,Pool的基本块大小算法无法为我们权衡针对 PO 的调度灵活性. IPC开销取决于操作系统,硬件和数据大小.该算法无法知道我们在什么硬件上运行我们的代码,也无法知道Taskel将花费多长时间来完成.这是一种启发式功能,可为 all 所有可能的情况提供基本功能.这意味着不能针对任何特定情况对其进行优化.如前所述, PO 也随着每个任务的计算时间增加(负相关)而变得越来越少.

当您回想起第2章中的并行化目标时,一个要点是:

  • 所有CPU核心的高利用率

前面提到的某事,Pool的块大小算法 可以尝试改进的是空闲工作进程的最小化,分别是 >使用cpu-cores .

关于multiprocessing.Pool的问题,SO上有一个重复的问题,人们想知道未使用的内核/在您希望所有工作进程都忙的情况下使工作进程空闲.尽管这可能有很多原因,但在工作人数众多的情况下,即使使用密集场景(每个任务的平均计算时间),我们经常可以观察到在计算结束时使工作进程空转.不是块数(n_chunks % n_workers > 0)的除数.

现在的问题是:

我们如何实际地将对块大小的理解转化为某种东西,使我们能够解释观察到的工人利用率,甚至在这方面比较不同算法的效率?


6.1模型

为了在这里获得更深入的了解,我们需要一种并行计算的抽象形式,该形式将过于复杂的现实简化到可管理的复杂程度,同时又在定义的边界内保留了重要性.这样的抽象称为模型.如果要收集数据,则这种"并行化模型"(PM)的实现会像实际计算一样生成工人映射的元数据(时间戳).通过模型生成的元数据,可以在一定约束下预测并行计算的指标.

此处定义的 PM 中的两个子模型之一是分发模型(DM). DM 解释了在除了各个块大小算法,工作人员数量,工作量,工作量和工作量等其他因素之外,原子工作单位(任务组)如何在并行工作人员和时间上进行分配.输入可迭代(任务组的数量)及其计算持续时间被考虑.这意味着包括任何形式的间接费用.

为了获得完整的 PM DM 扩展了开销模型(OM),代表了各种形式的并行化开销(PO).这种模型需要针对每个节点分别进行校准(硬件,操作系统相关性). OM 中代表多少形式的开销保持开放状态,因此可以存在具有不同复杂程度的多个 OM . OM 需要实现的准确度取决于具体计算的 PO 的总权重.较短的任务组会导致 PO 的权重较高,如果我们试图预测 并行化效率,则反过来需要更精确的 OM (PE).


6.2并行计划(PS)

并行计划是并行计算的二维表示,其中x轴表示时间,y轴表示并行工作池.工人的数量和总的计算时间标志着矩形的延伸,在矩形中画出了较小的矩形.这些较小的矩形代表工作的原子单位(任务组).

在下面,您可以找到 PS 的可视化,该图像是从密集场景的Pool的块大小算法的 DM 中提取的数据绘制的.

  • x轴被划分为相等的时间单位,其中每个单位代表Taskel所需的计算时间.
  • y轴分为池使用的工作进程数.
  • 此处的taskel显示为最小的青色矩形,并置于匿名工作进程的时间轴(日程表)中.
  • 任务是在工作人员时间轴中以相同色调连续突出显示的一个或多个任务.
  • 空闲时间单位用红色方块表示.
  • 并行计划"分为多个部分.最后一部分是尾部.

组成部分的名称可以在下图中看到.

在包括 OM 的完整 PM 中,空闲共享不仅限于尾部,而且还包含任务之间的空间,甚至是在任务之间.


6.3效率

上面介绍的模型可以量化工人的利用率.我们可以区分:

  • 分配效率(DE)-在 DM 的帮助下(或 Dense Scenario 的简化方法)进行计算.
  • 并行化效率(PE)-借助校准的 PM (预测)进行计算或根据实际计算的元数据进行计算.

重要的是要注意,对于给定的并行化问题,计算出的效率 会自动与更快整体计算相关.在这种情况下,对工人的利用只区分了已启动但尚未完成的任务组的工人和没有这样的开放"任务组的工人.这意味着,在 期间可能未进行任务注册,而在 中未注册.

上述所有效率基本上都是通过计算繁忙共享/并行计划的商获得的. DE PE 之间的区别在于忙碌共享" 在开销扩展了的 PM 中,在总体并行计划中占据较小的比例.

此答案将仅进一步讨论一种用于计算密集场景的 DE 的简单方法.这足以比较不同的块大小算法,因为...

  1. ... DM PM 的一部分,随着所使用的块大小算法的不同而变化.
  2. ...每个任务的计算持续时间相等的密集场景描绘了一个稳定状态",对于这些状态,这些时间跨度就不存在了.任何其他情况都将导致随机结果,因为任务组的排序很重要.


6.3.1绝对分配效率(ADE)

通常可以通过将繁忙份额除以并行时间表的全部潜力来计算基本效率:

绝对分配效率(ADE) = 忙碌共享/并行计划

对于密集场景,简化的计算代码如下:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

如果没有空闲共享,则忙碌共享将与并行时间表相等,因此我们得到 ADE (100%).在我们的简化模型中,这是一个场景,其中所有可用进程将在处理所有任务所需的整个时间中处于繁忙状态.换句话说,整个工作实际上可以并行化到100%.

但是为什么我在这里继续将 PE 称为绝对 PE ?

要理解这一点,我们必须考虑chunksize(cs)的可能情况,以确保最大的调度灵活性(此外,可以有Highlanders的数量.巧合吗?):

__________________________________ 〜一个〜 __________________________________

例如,如果我们有四个工作流程和37个任务组,则即使chunksize=1也不是空闲的,这是因为n_workers=4不是37的除数.除法37/4的余数是1剩下的这一个taskel必须由一个唯一的工人来处理,而其余的三个则处于空闲状态.

同样,仍然会有一个闲置的工人和39个任务组,您将如何看到下图.

当您将chunksize=1的上部并行计划与以下版本的chunksize=3进行比较时,您会发现上部并行计划较小,时间轴在x轴上较短.现在应该变得很明显了,即使密集场景 意外地使用更大的块大小也会导致总体计算时间增加.

但是为什么不仅仅使用x轴的长度进行效率计算呢?

因为此模型中不包含开销.两种块大小都将有所不同,因此x轴并不是真正可直接比较的.开销仍然可能导致更长的总计算时间,如下图所示,在情况2 中显示.


6.3.2相对分配效率(RDE)

ADE 值不包含以下信息:如果任务大小的更好分发可能且块大小设置为1.更好在这里仍然意味着较小的空闲共享.

要获得调整为最大可能值 DE DE 值,我们必须将考虑的 ADE 除以 ADE 我们得到chunksize=1.

相对分配效率(RDE) = ADE_cs_x / ADE_cs_1

这是代码中的样子:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE ,在这里如何定义,本质上是关于并行时间表尾部的故事. RDE 受尾部包含的最大有效块大小影响. (该尾部可以是x轴长度chunksizelast_chunk.) 结果是,对于各种尾部外观", RDE 自然会收敛到100%(甚至),如下图所示.

RDE 较低...

  • 是优化潜力的有力暗示.
  • 自然而然地,对于较长的可迭代对象,其可能性就较小,因为整体 Parallel Schedule 的相对尾部会缩小.

请在此处找到此答案的第二部分.

What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()? The .map() method seems to use an arbitrary heuristic for its default chunksize (explained below); what motivates that choice and is there a more thoughtful approach based on some particular situation/setup?

Example - say that I am:

  • Passing an iterable to .map() that has ~15 million elements;
  • Working on a machine with 24 cores and using the default processes = os.cpu_count() within multiprocessing.Pool().

My naive thinking is to give each of 24 workers an equally-sized chunk, i.e. 15_000_000 / 24 or 625,000. Large chunks should reduce turnover/overhead while fully utilizing all workers. But it seems that this is missing some potential downsides of giving large batches to each worker. Is this an incomplete picture, and what am I missing?


Part of my question stems from the default logic for if chunksize=None: both .map() and .starmap() call .map_async(), which looks like this:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

What's the logic behind divmod(len(iterable), len(self._pool) * 4)? This implies that the chunksize will be closer to 15_000_000 / (24 * 4) == 156_250. What's the intention in multiplying len(self._pool) by 4?

This makes the resulting chunksize a factor of 4 smaller than my "naive logic" from above, which consists of just dividing the length of the iterable by number of workers in pool._pool.

Lastly, there is also this snippet from the Python docs on .imap() that further drives my curiosity:

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.


Related answer that is helpful but a bit too high-level: Python multiprocessing: why are large chunksizes slower?.

解决方案

Short Answer

Pool's chunksize-algorithm is a heuristic. It provides a simple solution for all imaginable problem scenarios you are trying to stuff into Pool's methods. As a consequence, it cannot be optimized for any specific scenario.

The algorithm arbitrarily divides the iterable in approximately four times more chunks than the naive approach. More chunks mean more overhead, but increased scheduling flexibility. How this answer will show, this leads to a higher worker-utilization on average, but without the guarantee of a shorter overall computation time for every case.

"That's nice to know" you might think, "but how does knowing this help me with my concrete multiprocessing problems?" Well, it doesn't. The more honest short answer is, "there is no short answer", "multiprocessing is complex" and "it depends". An observed symptom can have different roots, even for similar scenarios.

This answer tries to provide you with basic concepts helping you to get a clearer picture of Pool's scheduling black box. It also tries to give you some basic tools at hand for recognizing and avoiding potential cliffs as far they are related to chunksize.


Table of Contents

Part I

  1. Definitions
  2. Parallelization Goals
  3. Parallelization Scenarios
  4. Risks of Chunksize > 1
  5. Pool's Chunksize-Algorithm
  6. Quantifying Algorithm Efficiency

    6.1 Models

    6.2 Parallel Schedule

    6.3 Efficiencies

    6.3.1 Absolute Distribution Efficiency (ADE)

    6.3.2 Relative Distribution Efficiency (RDE)

Part II

  1. Naive vs. Pool's Chunksize-Algorithm
  2. Reality Check
  3. Conclusion

It is necessary to clarify some important terms first.


1. Definitions


Chunk

A chunk here is a share of the iterable-argument specified in a pool-method call. How the chunksize gets calculated and what effects this can have, is the topic of this answer.


Task

A task's physical representation in a worker-process in terms of data can be seen in the figure below.

The figure shows an example call to pool.map(), displayed along a line of code, taken from the multiprocessing.pool.worker function, where a task read from the inqueue gets unpacked. worker is the underlying main-function in the MainThread of a pool-worker-process. The func-argument specified in the pool-method will only match the func-variable inside the worker-function for single-call methods like apply_async and for imap with chunksize=1. For the rest of the pool-methods with a chunksize-parameter the processing-function func will be a mapper-function (mapstar or starmapstar). This function maps the user-specified func-parameter on every element of the transmitted chunk of the iterable (--> "map-tasks"). The time this takes, defines a task also as a unit of work.


Taskel

While the usage of the word "task" for the whole processing of one chunk is matched by code within multiprocessing.pool, there is no indication how a single call to the user-specified func, with one element of the chunk as argument(s), should be referred to. To avoid confusion emerging from naming conflicts (think of maxtasksperchild-parameter for Pool's __init__-method), this answer will refer to the single units of work within a task as taskel.

A taskel (from task + element) is the smallest unit of work within a task. It is the single execution of the function specified with the func-parameter of a Pool-method, called with arguments obtained from a single element of the transmitted chunk. A task consists of chunksize taskels.


Parallelization Overhead (PO)

PO consists of Python-internal overhead and overhead for inter-process communication (IPC). The per-task overhead within Python comes with the code needed for packaging and unpacking the tasks and its results. IPC-overhead comes with the necessary synchronization of threads and the copying of data between different address spaces (two copy steps needed: parent -> queue -> child). The amount of IPC-overhead is OS-, hardware- and data-size dependent, what makes generalizations about the impact difficult.


2. Parallelization Goals

When using multiprocessing, our overall goal (obviously) is to minimize total processing time for all tasks. To reach this overall goal, our technical goal needs to be optimizing the utilization of hardware resources.

Some important sub-goals for achieving the technical goal are:

  • minimize parallelization overhead (most famously, but not alone: IPC)
  • high utilization across all cpu-cores
  • keeping memory usage limited to prevent the OS from excessive paging (trashing)

At first, the tasks need to be computationally heavy (intensive) enough, to earn back the PO we have to pay for parallelization. The relevance of PO decreases with increasing absolute computation time per taskel. Or, to put it the other way around, the bigger the absolute computation time per taskel for your problem, the less relevant gets the need for reducing PO. If your computation will take hours per taskel, the IPC overhead will be negligible in comparison. The primary concern here is to prevent idling worker processes after all tasks have been distributed. Keeping all cores loaded means, we are parallelizing as much as possible.


3. Parallelization Scenarios

What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()

The major factor in question is how much computation time may vary across our single taskels. To name it, the choice for an optimal chunksize is determined by the Coefficient of Variation (CV) for computation times per taskel.

The two extreme scenarios on a scale, following from the extent of this variation are:

  1. All taskels need exactly the same computation time.
  2. A taskel could take seconds or days to finish.

For better memorability, I will refer to these scenarios as:

  1. Dense Scenario
  2. Wide Scenario


Dense Scenario

In a Dense Scenario it would be desirable to distribute all taskels at once, to keep necessary IPC and context switching at a minimum. This means we want to create only as much chunks, as much worker processes there are. How already stated above, the weight of PO increases with shorter computation times per taskel.

For maximal throughput, we also want all worker processes busy until all tasks are processed (no idling workers). For this goal, the distributed chunks should be of equal size or close to.


Wide Scenario

The prime example for a Wide Scenario would be an optimization problem, where results either converge quickly or computation can take hours, if not days. Usually it is not predictable what mixture of "light taskels" and "heavy taskels" a task will contain in such a case, hence it's not advisable to distribute too many taskels in a task-batch at once. Distributing less taskels at once than possible, means increasing scheduling flexibility. This is needed here to reach our sub-goal of high utilization of all cores.

If Pool methods, by default, would be totally optimized for the Dense Scenario, they would increasingly create suboptimal timings for every problem located closer to the Wide Scenario.


4. Risks of Chunksize > 1

Consider this simplified pseudo-code example of a Wide Scenario-iterable, which we want to pass into a pool-method:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

Instead of the actual values, we pretend to see the needed computation time in seconds, for simplicity only 1 minute or 1 day. We assume the pool has four worker processes (on four cores) and chunksize is set to 2. Because the order will be kept, the chunks send to the workers will be these:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

Since we have enough workers and the computation time is high enough, we can say, that every worker process will get a chunk to work on in the first place. (This does not have to be the case for fast completing tasks). Further we can say, the whole processing will take about 86400+60 seconds, because that's the highest total computation time for a chunk in this artificial scenario and we distribute chunks only once.

Now consider this iterable, which has only one element switching its position compared to the previous iterable:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

...and the corresponding chunks:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

Just bad luck with the sorting of our iterable nearly doubled (86400+86400) our total processing time! The worker getting the vicious (86400, 86400)-chunk is blocking the second heavy taskel in its task from getting distributed to one of the idling workers already finished with their (60, 60)-chunks. We obviously would not risk such an unpleasant outcome if we set chunksize=1.

This is the risk of bigger chunksizes. With higher chunksizes we trade scheduling flexibility for less overhead and in cases like above, that's a bad deal.

How we will see in chapter 6. Quantifying Algorithm Efficiency, bigger chunksizes can also lead to suboptimal results for Dense Scenarios.


5. Pool's Chunksize-Algorithm

Below you will find a slightly modified version of the algorithm inside the source code. As you can see, I cut off the lower part and wrapped it into a function for calculating the chunksize argument externally. I also replaced 4 with a factor parameter and outsourced the len() calls.

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

To ensure we are all on the same page, here's what divmod does:

divmod(x, y) is a builtin function which returns (x//y, x%y). x // y is the floor division, returning the down rounded quotient from x / y, while x % y is the modulo operation returning the remainder from x / y. Hence e.g. divmod(10, 3) returns (3, 1).

Now when you look at chunksize, extra = divmod(len_iterable, n_workers * 4), you will notice n_workers here is the divisor y in x / y and multiplication by 4, without further adjustment through if extra: chunksize +=1 later on, leads to an initial chunksize at least four times smaller (for len_iterable >= n_workers * 4) than it would be otherwise.

For viewing the effect of multiplication by 4 on the intermediate chunksize result consider this function:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

The function above calculates the naive chunksize (cs_naive) and the first-step chunksize of Pool's chunksize-algorithm (cs_pool1), as well as the chunksize for the complete Pool-algorithm (cs_pool2). Further it calculates the real factors rf_pool1 = cs_naive / cs_pool1 and rf_pool2 = cs_naive / cs_pool2, which tell us how many times the naively calculated chunksizes are bigger than Pool's internal version(s).

Below you see two figures created with output from this function. The left figure just shows the chunksizes for n_workers=4 up until an iterable length of 500. The right figure shows the values for rf_pool1. For iterable length 16, the real factor becomes >=4(for len_iterable >= n_workers * 4) and it's maximum value is 7 for iterable lengths 28-31. That's a massive deviation from the original factor 4 the algorithm converges to for longer iterables. 'Longer' here is relative and depends on the number of specified workers.

Remember chunksize cs_pool1 still lacks the extra-adjustment with the remainder from divmod contained in cs_pool2 from the complete algorithm.

The algorithm goes on with:

if extra:
    chunksize += 1

Now in cases were there is a remainder (an extra from the divmod-operation), increasing the chunksize by 1 obviously cannot work out for every task. After all, if it would, there would not be a remainder to begin with.

How you can see in the figures below, the "extra-treatment" has the effect, that the real factor for rf_pool2 now converges towards 4 from below 4 and the deviation is somewhat smoother. Standard deviation for n_workers=4 and len_iterable=500 drops from 0.5233 for rf_pool1 to 0.4115 for rf_pool2.

Eventually, increasing chunksize by 1 has the effect, that the last task transmitted only has a size of len_iterable % chunksize or chunksize.

The more interesting and how we will see later, more consequential, effect of the extra-treatment however can be observed for the number of generated chunks (n_chunks). For long enough iterables, Pool's completed chunksize-algorithm (n_pool2 in the figure below) will stabilize the number of chunks at n_chunks == n_workers * 4. In contrast, the naive algorithm (after an initial burp) keeps alternating between n_chunks == n_workers and n_chunks == n_workers + 1 as the length of the iterable grows.

Below you will find two enhanced info-functions for Pool's and the naive chunksize-algorithm. The output of these functions will be needed in the next chapter.

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

Don't be confused by the probably unexpected look of calc_naive_chunksize_info. The extra from divmod is not used for calculating the chunksize.

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )


6. Quantifying Algorithm Efficiency

Now, after we have seen how the output of Pool's chunksize-algorithm looks different compared to output from the naive algorithm...

  • How to tell if Pool's approach actually improves something?
  • And what exactly could this something be?

As shown in the previous chapter, for longer iterables (a bigger number of taskels), Pool's chunksize-algorithm approximately divides the iterable into four times more chunks than the naive method. Smaller chunks mean more tasks and more tasks mean more Parallelization Overhead (PO), a cost which must be weighed against the benefit of increased scheduling-flexibility (recall "Risks of Chunksize>1").

For rather obvious reasons, Pool's basic chunksize-algorithm cannot weigh scheduling-flexibility against PO for us. IPC-overhead is OS-, hardware- and data-size dependent. The algorithm cannot know on what hardware we run our code, nor does it have a clue how long a taskel will take to finish. It's a heuristic providing basic functionality for all possible scenarios. This means it cannot be optimized for any scenario in particular. As mentioned before, PO also becomes increasingly less of a concern with increasing computation times per taskel (negative correlation).

When you recall the Parallelization Goals from chapter 2, one bullet-point was:

  • high utilization across all cpu-cores

The previously mentioned something, Pool's chunksize-algorithm can try to improve is the minimization of idling worker-processes, respectively the utilization of cpu-cores.

A repeating question on SO regarding multiprocessing.Pool is asked by people wondering about unused cores / idling worker-processes in situations where you would expect all worker-processes busy. While this can have many reasons, idling worker-processes towards the end of a computation are an observation we can often make, even with Dense Scenarios (equal computation times per taskel) in cases where the number of workers is not a divisor of the number of chunks (n_chunks % n_workers > 0).

The question now is:

How can we practically translate our understanding of chunksizes into something which enables us to explain observed worker-utilization, or even compare the efficiency of different algorithms in that regard?


6.1 Models

For gaining deeper insights here, we need a form of abstraction of parallel computations which simplifies the overly complex reality down to a manageable degree of complexity, while preserving significance within defined boundaries. Such an abstraction is called a model. An implementation of such a "Parallelization Model" (PM) generates worker-mapped meta-data (timestamps) as real computations would, if the data were to be collected. The model-generated meta-data allows predicting metrics of parallel computations under certain constraints.

One of two sub-models within the here defined PM is the Distribution Model (DM). The DM explains how atomic units of work (taskels) are distributed over parallel workers and time, when no other factors than the respective chunksize-algorithm, the number of workers, the input-iterable (number of taskels) and their computation duration is considered. This means any form of overhead is not included.

For obtaining a complete PM, the DM is extended with an Overhead Model (OM), representing various forms of Parallelization Overhead (PO). Such a model needs to be calibrated for each node individually (hardware-, OS-dependencies). How many forms of overhead are represented in a OM is left open and so multiple OMs with varying degrees of complexity can exist. Which level of accuracy the implemented OM needs is determined by the overall weight of PO for the specific computation. Shorter taskels lead to a higher weight of PO, which in turn requires a more precise OM if we were attempting to predict Parallelization Efficiencies (PE).


6.2 Parallel Schedule (PS)

The Parallel Schedule is a two-dimensional representation of the parallel computation, where the x-axis represents time and the y-axis represents a pool of parallel workers. The number of workers and the total computation time mark the extend of a rectangle, in which smaller rectangles are drawn in. These smaller rectangles represent atomic units of work (taskels).

Below you find the visualization of a PS drawn with data from the DM of Pool's chunksize-algorithm for the Dense Scenario.

  • The x-axis is sectioned into equal units of time, where each unit stands for the computation time a taskel requires.
  • The y-axis is divided into the number of worker-processes the pool uses.
  • A taskel here is displayed as the smallest cyan-colored rectangle, put into a timeline (a schedule) of an anonymized worker-process.
  • A task is one or multiple taskels in a worker-timeline continuously highlighted with the same hue.
  • Idling time units are represented through red colored tiles.
  • The Parallel Schedule is partitioned into sections. The last section is the tail-section.

The names for the composed parts can be seen in the picture below.

In a complete PM including an OM, the Idling Share is not limited to the tail, but also comprises space between tasks and even between taskels.


6.3 Efficiencies

The Models introduced above allow quantifying the rate of worker-utilization. We can distinguish:

  • Distribution Efficiency (DE) - calculated with help of a DM (or a simplified method for the Dense Scenario).
  • Parallelization Efficiency (PE) - either calculated with help of a calibrated PM (prediction) or calculated from meta-data of real computations.

It's important to note, that calculated efficiencies do not automatically correlate with faster overall computation for a given parallelization problem. Worker-utilization in this context only distinguishes between a worker having a started, yet unfinished taskel and a worker not having such an "open" taskel. That means, possible idling during the time span of a taskel is not registered.

All above mentioned efficiencies are basically obtained by calculating the quotient of the division Busy Share / Parallel Schedule. The difference between DE and PE comes with the Busy Share occupying a smaller portion of the overall Parallel Schedule for the overhead-extended PM.

This answer will further only discuss a simple method to calculate DE for the Dense Scenario. This is sufficiently adequate to compare different chunksize-algorithms, since...

  1. ... the DM is the part of the PM, which changes with different chunksize-algorithms employed.
  2. ... the Dense Scenario with equal computation durations per taskel depicts a "stable state", for which these time spans drop out of the equation. Any other scenario would just lead to random results since the ordering of taskels would matter.


6.3.1 Absolute Distribution Efficiency (ADE)

This basic efficiency can be calculated in general by dividing the Busy Share through the whole potential of the Parallel Schedule:

Absolute Distribution Efficiency (ADE) = Busy Share / Parallel Schedule

For the Dense Scenario, the simplified calculation-code looks like this:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

If there is no Idling Share, Busy Share will be equal to Parallel Schedule, hence we get an ADE of 100%. In our simplified model, this is a scenario where all available processes will be busy through the whole time needed for processing all tasks. In other words, the whole job gets effectively parallelized to 100 percent.

But why do I keep referring to PE as absolute PE here?

To comprehend that, we have to consider a possible case for the chunksize (cs) which ensures maximal scheduling flexibility (also, the number of Highlanders there can be. Coincidence?):

__________________________________~ ONE ~__________________________________

If we, for example, have four worker-processes and 37 taskels, there will be idling workers even with chunksize=1, just because n_workers=4 is not a divisor of 37. The remainder of dividing 37 / 4 is 1. This single remaining taskel will have to be processed by a sole worker, while the remaining three are idling.

Likewise, there will still be one idling worker with 39 taskels, how you can see pictured below.

When you compare the upper Parallel Schedule for chunksize=1 with the below version for chunksize=3, you will notice that the upper Parallel Schedule is smaller, the timeline on the x-axis shorter. It should become obvious now, how bigger chunksizes unexpectedly also can lead to increased overall computation times, even for Dense Scenarios.

But why not just use the length of the x-axis for efficiency calculations?

Because the overhead is not contained in this model. It will be different for both chunksizes, hence the x-axis is not really directly comparable. The overhead can still lead to a longer total computation time like shown in case 2 from the figure below.


6.3.2 Relative Distribution Efficiency (RDE)

The ADE value does not contain the information if a better distribution of taskels is possible with chunksize set to 1. Better here still means a smaller Idling Share.

To get a DE value adjusted for the maximum possible DE, we have to divide the considered ADE through the ADE we get for chunksize=1.

Relative Distribution Efficiency (RDE) = ADE_cs_x / ADE_cs_1

Here is how this looks in code:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE, how defined here, in essence is a tale about the tail of a Parallel Schedule. RDE is influenced by the maximum effective chunksize contained in the tail. (This tail can be of x-axis length chunksize or last_chunk.) This has the consequence, that RDE naturally converges to 100% (even) for all sorts of "tail-looks" like shown in the figure below.

A low RDE ...

  • is a strong hint for optimization potential.
  • naturally gets less likely for longer iterables, because the relative tail-portion of the overall Parallel Schedule shrinks.

Please find Part II of this answer here.

这篇关于Python多重处理:了解"chunksize"背后的逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆