大对象列表上的多处理 Pool.map() 缩放不佳:如何在 python 中实现更好的并行缩放? [英] Poor scaling of multiprocessing Pool.map() on a list of large objects: How to achieve better parallel scaling in python?

查看:26
本文介绍了大对象列表上的多处理 Pool.map() 缩放不佳:如何在 python 中实现更好的并行缩放?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们定义:

from multiprocessing import Pool将 numpy 导入为 np定义函数(x):对于我在范围内(1000):我**2返回 1

注意 func() 做了一些事情,它总是返回一个小数 1.

然后,我将 8 核并行 Pool.map() 与内置 Python 的串行 map()

进行比较

n=10**3a=np.random.random(n).tolist()以 Pool(8) 作为 p:%timeit -r1 -n2 p.map(func,a)%timeit -r1 -n2 list(map(func,a))

这给出:

38.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)每个循环 200 ms ± 0 ns(1 次运行的平均值 ± 标准偏差,每个循环 2 次)

这显示了非常好的并行缩放.因为我用的是8核,而且38.3 [ms]大概是200[s]

的1/8

然后让我们在一些更大的事物列表上尝试 Pool.map(),为了简单起见,我这样使用列表列表:

n=10**3米=10**4a=np.random.random((n,m)).tolist()以 Pool(8) 作为 p:%timeit -r1 -n2 p.map(func,a)%timeit -r1 -n2 list(map(func,a))

给出:

每个循环 292 ms ± 0 ns(平均值 ± 标准差.1 次运行,每次 2 次循环)每个循环 209 ms ± 0 ns(1 次运行的平均值 ± 标准偏差,每个循环 2 次)

你看,并行缩放已经消失了!1s ~ 1.76s

我们可以让它变得更糟,尝试让每个子列表传递更大:

n=10**3米=10**5a=np.random.random((n,m)).tolist()以 Pool(8) 作为 p:%timeit -r1 -n2 p.map(func,a)%timeit -r1 -n2 list(map(func,a))

这给出:

3.29 s ± 0 ns 每个循环(平均值 ± 标准差.1 次运行,每次 2 次循环)每个循环 179 ms ± 0 ns(1 次运行的平均值 ± 标准偏差,每个循环 2 次)

哇,对于更大的子列表,时间结果完全相反.我们使用 8 个内核来获得慢 20 倍的计时!!

您还可以注意到串行 map() 的时序与子列表大小无关.所以一个合理的解释是 Pool.map() 真的在传递那些导致额外复制的进程周围的那些大子列表的内容?

我不确定.但是如果是这样,为什么它不传递子列表的地址呢?毕竟子列表已经在内存中了,在实践中我使用的func()保证不会改变/修改子列表.

那么,在 Python 中,在将某些操作映射到大型事物列表时保持并行缩放的正确方法是什么?

解决方案

在我们开始之前
深入研究对纳秒的任何追寻(没错,它很快就会开始,由于每个 [ns] 都很重要,因为缩放会打开问题的整个潘多拉魔盒 ),让我们就缩放达成一致 - 最简单且通常 便宜" 一旦问题规模发展到现实规模,过早的技巧可能并且经常会破坏您的梦想 - 数千(在上面的两个迭代器中看到)对于缓存计算的行为方式不同em> 与 <0.5 [ns] 数据提取,超过 L1/L2/L3 缓存大小,1E+5, 1E+6, 1E+9,code> 高于 [GB] s,哪里每个未对齐的 fetch 比几个 100 [ns] 更昂贵

<块引用>

Q : "...因为我有 8 个内核,我想用它们使速度提高 8 倍"

希望可以,确实.然而,恕我直言,世界不是这样运作的.

<块引用>

查看此交互式工具,它将向您展示加速限制及其对实际生产成本的主要依赖-初始问题的世界缩放,因为它从琐碎的大小和这些组合效果大规模只需点击-it和播放 使用滑块实时查看它 :

Q : (is)Pool.map() 真正传递那些围绕进程的大子列表的内容,导致额外的副本?

是的,
它必须这样做,按照设计
加上它通过传递所有数据通过"另一个昂贵" SER/DES 处理
以便使其发生那里".
只要您尝试过,反之亦然返回返回"一些乳齿象大小的结果,你没有,在上面.

<块引用>

Q : 如果是这样,为什么不传递子列表的地址?

因为远程(参数接收)进程是另一个完全自治的进程,有自己的、独立的和保护地址空间,我们不能只传递一个地址引用em> "into",我们希望它成为一个完全独立、自主工作的 Python 进程(因为我们愿意使用这个技巧来逃避

A )
了解避免或至少减少费用的方法:

了解所有类型的您必须支付并且将支付成本:

  • 低花费 process 实例化成本尽可能(相当昂贵)最好仅作为一次性成本

    <块引用>

    在 macOS 上,spawn 启动方法现在是默认方法.fork start 方法应该被认为是不安全的,因为它会导致子进程崩溃.请参阅 bpo-33725.

  • 花费尽可能小的参数传递成本(是的,最好避免重复传递那些大事"作为参数)

  • 永远不要将资源浪费在不能执行您工作的事情上 -(永远不要产生比 len( os.sched_getaffinity( 0 ) ) 报告的更多的进程 - 任何进程不仅如此,它只会等待它的下一个 CPU 核心插槽,并且只会驱逐其他缓存效率高的进程,从而重新支付所有已经支付的获取成本,以便再次重新获取所有数据,以便 Camp-em回到缓存中,很快就会再次被逐出缓存中计算,而那些到目前为止以这种方式工作的进程被正确地逐出(有什么好处?)-报告的进程,在初始 Pool-creation 中产生的代价如此昂贵)
  • 最好重用预先分配的内存,而不是继续花费临时内存分配成本 ALAP
  • 永远不要分享一点,如果绩效是目标
  • 永不阻塞,永不阻塞 - 无论是 python gc(如果不避免可能阻塞),还是 Pool.map() 哪个会阻止

B )
了解提高效率的方法 :

了解所有提高效率的技巧,即使以代码的复杂性为代价(一些 SLOC-s 很容易出现在教科书中,但同时牺牲了效率和性能——尽管这两者都是你的主要敌人,在整个你的主要敌人强>缩放(问题规模或迭代深度,或同时增长两者).

A 中的某些实际成本类别显着改变了限制 进入某种形式的 [PARALLEL] 进程编排(这里,使代码执行的某些部分在衍生子-processes ),其最初的观点最早由 Gene Amdahl 博士在 60 多年前提出(为此,最近添加了两个与流程实例化相关的设置的主要扩展 + 终止附加成本(在 py2 中非常重要,对于 MacOS 和 Windows 总是 py3.5+)和 原子性-of-work,这将在下面讨论.

阿姆达尔定律加速 S 的开销严格重新制定:

S = N 个处理器可以实现的加速s = 计算的比例,即 [SERIAL]1-s = 一个可并行化的部分,可以运行 [PAR]N = 积极参与 [PAR] 处理的处理器(CPU 核)数量1S = __________________________;其中 s, ( 1 - s ), N 在上面定义( 1 - s ) pSO:= [PAR]-Setup-Overhead 附加成本/延迟s + pSO + _________ + pTO pTO:= [PAR]-Terminate-Overhead 附加成本/延迟N

开销严格和资源感知重新制定:

 1 其中 s, ( 1 - s ), NS = ________________________________________________________ ;pSO, pTO|( 1 - s ) |上面定义了s + pSO + 最大值|_________ , atomicP |+ pTO atomicP:= 一个工作单元,|否 |进一步不可分割,一个持续时间原子进程块

<小时>

用你的python在目标CPU/RAM设备上原型,缩放>>1E+6

任何简化的模型示例都会以某种方式扭曲您对实际工作负载在体内的表现的预期.被低估的 RAM 分配,在小规模上看不到,以后可能会在规模上出人意料,有时甚至会使操作系统陷入缓慢状态、交换和颠簸.一些更智能的工具( numba.jit() )甚至可以分析代码并缩短一些永远不会访问或不会产生任何结果的代码段,因此请注意简化示例可能导致令人惊讶的观察结果.

from multiprocessing import Pool将 numpy 导入为 np导入操作系统规模=整数(1E9)步骤 = int( 1E1 )aLIST = np.random.random( ( 10**3, 10**4 ) ).tolist()################################################################################### func() 做了一些 SCALE 的工作,但是# 传递几乎零字节作为参数# 什么都不分配,但迭代器# 返回一个字节,# 对任何昂贵的输入都是不变的定义函数( x ):对于 i 在范围内( SCALE ):我**2返回 1

关于使扩展策略降低间接成本的一些提示:

################################################################################# more_work_en_block() 包含一些 SCALE 的工作量,指定的子列表def more_work_en_block( en_block = [ None, ] ):返回 [ func( nth_item ) for en_block 中的 nth_item ]

如果确实必须通过一个大列表,最好通过更大的块,远程迭代它的部分(而不是为每个项目支付更多次的转移成本,而不是使用 sub_blocks (参数得到 SER/DES 处理(~pickle.dumps() + pickle.loads() 的成本)[per-each-call],再次,在附加成本,这会降低由此产生的效率并恶化扩展的、开销严格的阿姆达尔定律的开销部分)

################################################################################# some_work_en_block() 包含一些 SCALE 的工作量,元组指定def some_work_en_block( sub_block = ( [ None, ], 0, 1 ) ):返回 more_work_en_block( en_block = sub_block[0][sub_block[1]:sub_block[2]])

<小时>

适当调整流程实例的数量:

aMaxNumOfProcessesThatMakesSenseToSPAWN = len( os.sched_getaffinity( 0 ) ) # 再也没有了使用 Pool( aMaxNumOfProcessesThatMakesSenseToSPAWN ) 作为 p:p.imap_unordered( more_work_en_block, [ ( aLIST,开始,开始 + 步骤)开始范围(0,len(aLIST),步骤)])

最后但并非最不重要的一点是,期望通过智能使用 numpy 智能矢量化代码带来巨大的性能提升,最好不要重复传递静态、预复制(在进程实例化期间,因此支付为在代码中使用的 BLOB 的合理缩放(此处不可避免)的成本,无需通过参数传递以矢量化(CPU 非常高效)的方式作为只读数据传递相同的数据.关于如何使~+500 x加速的一些示例可以阅读此处a> 或 此处,大约但~+400 x加速或大约一个关于 ~ +100 x 加速的案例,一些问题隔离的例子测试场景.

无论如何,模型代码与您的实际工作负载越接近,基准测试就越有意义(大规模和生产中).


祝你在探索世界时好运,
如果它不同,就不是梦想,
不是希望它与众不同或我们希望它成为这样

:o)

事实和科学都很重要——两者+一起

证据记录是实现尽可能高绩效的核心步骤,
没有任何产品营销,
没有任何传福音氏族战争,
没有任何博客帖子的喋喋不休

至少不要说你没有被警告

:o)


Let us define :

from multiprocessing import Pool
import numpy as np
def func(x):
    for i in range(1000):
        i**2
    return 1

Notice that func() does something and it always returns a small number 1.

Then, I compare an 8-core parallel Pool.map() v/s a serial, python built in, map()

n=10**3
a=np.random.random(n).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

This gives :

38.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
200 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

which shows quite good parallel scaling. Because I use 8 cores, and 38.3 [ms] is roughly 1/8 of 200[s]

Then let us try Pool.map() on lists of some bigger things, for simplicity, I use a list-of-lists this way :

n=10**3
m=10**4
a=np.random.random((n,m)).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

which gives :

292 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
209 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

You see, parallel scaling is gone! 1s ~ 1.76s

We can make it much worse, try to make each sub list to pass even bigger :

n=10**3
m=10**5
a=np.random.random((n,m)).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

This gives :

3.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
179 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

Wow, with even larger sub lists, the timing result is totally reversed. We use 8 cores to get a 20 times slower timing!!

You can also notice the serial map()'s timing has nothing to do with a sub list size. So a reasonable explanation would be that Pool.map() are really passing the content of those big sub list around processes which cause additional copy?

I am not sure. But if so, why doesn't it passing the address of sub-list? After all, the sub-list is already in the memory, and in practice the func() I used is guaranteed not to change/modify the sub-list.

So, in python, what is the correct way to keep parallel scaling when mapping some operations on a list of large things?

解决方案

Before we start
and dive deeper into any hunt for nanoseconds ( and right, it will soon start, as each [ns] matters as the scaling opens the whole Pandora Box of the problems ), lets agree on the scales - most easy and often "cheap" premature tricks may and often will derail your dreams once the scales of the problem size have grown into realistic scales - the thousands (seen above in both iterators) behave way different for in-cache computing with < 0.5 [ns] data-fetches, than once having grown beyond the L1/L2/L3-cache-sizes for scales above 1E+5, 1E+6, 1E+9, above [GB]s, where each mis-aligned fetch is WAY more EXPENSIVE, than a few 100 [ns]

Q : "... because I have 8 cores, I want to use them to get 8 times faster"

I wish you could, indeed. Yet, sorry for telling the truth straight, the World does not work this way.

See this interactive tool, it will show you both the speedup limits and their principal dependence on the actual production costs of the real-world scaling of the initial problem, as it grows from trivial sizes and these combined effects at scale just click-it and play with the sliders to see it live, in action :

Q : (is)Pool.map() really passing the content of those big sub list around processes which cause additional copy?

Yes,
it must do so, by design
plus it does that by passing all that data "through" another "expensive" SER/DES processing,
so as to make it happen delivered "there".
The very same would apply vice-versa whenever you would have tried to return "back" some mastodon-sized result(s), which you did not, here above.

Q : But if so, why doesn't it passing the address of sub-list?

Because the remote ( parameter-receiving ) process is another, fully autonomous process, with its own, separate and protected, address-space we cannot just pass an address-reference "into", and we wanted that to be a fully independent, autonomously working python process ( due to a will to use this trick so as to escape from GIL-lock dancing ), didn't we? Sure we did - this is a central step of our escape from the GIL-Wars ( for better understanding of the GIL-lock pros and cons, may like this and this ( Pg.15+ on CPU-bound processing ).

             0.1 ns - NOP
             0.3 ns - XOR, ADD, SUB
             0.5 ns - CPU L1 dCACHE reference           (1st introduced in late 80-ies )
             0.9 ns - JMP SHORT
             1   ns - speed-of-light (a photon) travel a 1 ft (30.5cm) distance -- will stay, throughout any foreseeable future :o)
?~~~~~~~~~~~ 1   ns - MUL ( i**2 = MUL i, i )~~~~~~~~~ doing this 1,000 x is 1 [us]; 1,000,000 x is 1 [ms]; 1,000,000,000 x is 1 [s] ~~~~~~~~~~~~~~~~~~~~~~~~~
           3~4   ns - CPU L2  CACHE reference           (2020/Q1)
             5   ns - CPU L1 iCACHE Branch mispredict
             7   ns - CPU L2  CACHE reference
            10   ns - DIV
            19   ns - CPU L3  CACHE reference           (2020/Q1 considered slow on 28c Skylake)
            71   ns - CPU cross-QPI/NUMA best  case on XEON E5-46*
           100   ns - MUTEX lock/unlock
           100   ns - own DDR MEMORY reference
           135   ns - CPU cross-QPI/NUMA best  case on XEON E7-*
           202   ns - CPU cross-QPI/NUMA worst case on XEON E7-*
           325   ns - CPU cross-QPI/NUMA worst case on XEON E5-46*
        10,000   ns - Compress 1K bytes with a Zippy PROCESS
        20,000   ns - Send     2K bytes over 1 Gbps  NETWORK
       250,000   ns - Read   1 MB sequentially from  MEMORY
       500,000   ns - Round trip within a same DataCenter
?~~~ 2,500,000   ns - Read  10 MB sequentially from  MEMORY~~(about an empty python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s), yet an empty python interpreter is indeed not a real-world, production-grade use-case, is it?
    10,000,000   ns - DISK seek
    10,000,000   ns - Read   1 MB sequentially from  NETWORK
?~~ 25,000,000   ns - Read 100 MB sequentially from  MEMORY~~(somewhat light python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s)
    30,000,000   ns - Read 1 MB sequentially from a  DISK
?~~ 36,000,000   ns - Pickle.dump() SER a 10 MB object for IPC-transfer and remote DES in spawned process~~~~~~~~ x ( 2 ) for a single 10MB parameter-payload SER/DES + add an IPC-transport costs thereof or NETWORK-grade transport costs, if going into [distributed-computing] model Cluster ecosystem
   150,000,000   ns - Send a NETWORK packet CA -> Netherlands
  |   |   |   |
  |   |   | ns|
  |   | us|
  | ms|

Q : " what is the correct way to keep parallel scaling when parallel mapping some operations on a list of large things? "

A )
UNDERSTAND THE WAYS TO AVOID OR AT LEAST REDUCE EXPENSES :

Understand all the types of the costs you have to pay and will pay :

  • spend as low process instantiation costs as possible (rather expensive ) best as a one-time cost only

    On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See bpo-33725.

  • spend as small amount of costs of parameter-passing as you must ( yes, best avoid repetitive passing those "large things" as parameters )

  • never waste resources on things that do not perform your job - ( never spawn more processes than was reported by len( os.sched_getaffinity( 0 ) ) - any process more than this will but wait for its next CPU-core-slot, and will but evict other, cache-efficient process, thus re-paying all the fetch-costs once already paid to re-fetch again all data so to camp-em back in-cache for a soon to get evicted again in-cache computing, while those processes that worked so far this way were right evicted (for what good?) by a naive use of as many as multiprocessing.cpu_count()-reported processes, so expensively spawned in the initial Pool-creation )
  • better re-use a pre-allocated memory, than keep spending ad-hoc memory allocation costs ALAP
  • never share a bit, if The Performance is the goal
  • never block, never - be it python gc which may block if not avoided, or Pool.map() which blocks either

B )
UNDERSTAND THE WAYS TO INCREASE THE EFFICIENCY :

Understand all efficiency increasing tricks, even at a cost of complexity of code ( a few SLOC-s are easy to show in school-books, yet sacrificing both the efficiency and the performance - in spite of these both being your main enemy in a fight for a sustainable performance throughout the scaling ( of either of problem size or iteration depths, or when growing both of them at the same time ).

Some categories of the real-world costs from A ) have dramatically changed the limits of the theoretically achievable speedups to be expected from going into some form of [PARALLEL] process orchestrations ( here, making some parts of the code-execution got executed in the spawned sub-processes ), the initial view of which was first formulated by Dr. Gene Amdahl as early as 60+ years ago ( for which there were recently added two principal extensions of both the process instantiation(s) related setup + termination add on costs ( extremely important in py2 always & py3.5+ for MacOS and Windows ) and an atomicity-of-work, which will be discussed below.

Overhead-strict re-formulation of the Amdahl's Law speedup S:

S   = speedup which can be achieved with N processors
s   = a proportion of a calculation, which is [SERIAL]
1-s = a parallelizable portion, that may run  [PAR]
N   = a number of processors ( CPU-cores ) actively participating on [PAR] processing

               1
S =  __________________________; where s, ( 1 - s ), N were defined above
                ( 1 - s )            pSO:= [PAR]-Setup-Overhead     add-on cost/latency
     s  + pSO + _________ + pTO      pTO:= [PAR]-Terminate-Overhead add-on cost/latency
                    N               

Overhead-strict and resources-aware re-formulation:

                           1                         where s, ( 1 - s ), N
S =  ______________________________________________ ;      pSO, pTO
                   | ( 1 - s )             |               were defined above
     s  + pSO + max|  _________ , atomicP  |  + pTO        atomicP:= a unit of work,
                   |     N                 |                         further indivisible,
                                                                     a duration of an
                                                                     atomic-process-block


Prototype on target CPU/RAM device with your python, scaled >>1E+6

Any simplified mock-up example will somehow skew your expectations about how the actual workloads will perform in-vivo. Underestimated RAM-allocations, not seen at small-scales may later surprise at scale, sometimes even throwing the operating system into sluggish states, swapping and thrashing. Some smarter tools ( numba.jit() ) may even analyze the code and shortcut some passages of code, that will never be visited or that does not produce any result, so be warned that simplified examples may lead to surprising observations.

from multiprocessing import Pool
import numpy as np
import os

SCALE = int( 1E9 )
STEP  = int( 1E1 )
aLIST = np.random.random( ( 10**3, 10**4 ) ).tolist()

#######################################################################################
#   func() does some SCALE'd amount of work, yet
#                                                passes almost zero bytes as parameters
#                                                allocates nothing, but iterator
#                                                returns one byte,
#                                                invariant to any expensive inputs
def func( x ):  
    for i in range( SCALE ):
        i**2
    return 1

A few hints on making the strategy of scaling less overhead-costs expensive :

#####################################################################################
#   more_work_en_block() wraps some SCALE'd amount of work, sub-list specified
def more_work_en_block( en_block = [ None, ] ):
    return [ func( nth_item ) for nth_item in en_block ]

If indeed must pass a big list, better pass larger block, with remote-iterating its parts ( instead of paying transfer-costs for each and every item passed many many more times, than if using sub_blocks ( parameters get SER/DES processed ( ~ the costs of pickle.dumps() + pickle.loads() ) [per-each-call], again, at an add-on costs, that decrease the resulting efficiency and worsen the overheads part of the extended, overhead-strict Amdahl's Law )

#####################################################################################
#   some_work_en_block() wraps some SCALE'd amount of work, tuple-specified
def some_work_en_block( sub_block = ( [ None, ], 0, 1 ) ):
    return more_work_en_block( en_block = sub_block[0][sub_block[1]:sub_block[2]] )


Right-sizing the number of process-instances :

aMaxNumOfProcessesThatMakesSenseToSPAWN = len( os.sched_getaffinity( 0 ) ) # never more

with Pool( aMaxNumOfProcessesThatMakesSenseToSPAWN ) as p:
     p.imap_unordered( more_work_en_block, [ ( aLIST,
                                               start,
                                               start + STEP
                                               )
                                           for start in range( 0, len( aLIST ), STEP ) ] )

Last but not least, expect immense performance boosts from smart use of numpy smart vectorised code, best without repetitive passing of static, pre-copied (during the process instantiation(s), thus paid as the reasonably scaled, here un-avoidable, cost of thereof ) BLOBs, used in the code without passing the same data via parameter-passing, in a vectorised ( CPU-very-efficient ) fashion as read-only data. Some examples on how one can make ~ +500 x speedup one may read here or here, about but ~ +400 x speedup or about a case of just about a ~ +100 x speedup, with some examples of some problem-isolation testing scenarios.

Anyway, the closer will the mock-up code be to your actual workloads, the more sense the benchmarks will get to have ( at scale & in production ).


Good luck on exploring the World, as it is,
not as a dream if it were different,
not as a wish it were different or that we would like it to be

:o)

Facts and Science matter - both + together

Records of Evidence are the core steps forwards to achieve as high performance as possible,
not any Product Marketing,
not any Evangelisation Clans wars,
not any Blog-posts' chatter

At least don't say you was not warned

:o)


这篇关于大对象列表上的多处理 Pool.map() 缩放不佳:如何在 python 中实现更好的并行缩放?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆