为什么python多处理比串行代码花费更多时间?如何加速? [英] Why python multiprocessing takes more time than serial code? How to speedup this?

查看:22
本文介绍了为什么python多处理比串行代码花费更多时间?如何加速?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在试用 Python 多处理模块.下面代码中串行执行时间为0.09秒,并行执行时间为0.2秒.由于我没有加速,我想我可能在某个地方出错

将多处理导入为 mp从随机进口制服,randrange导入时间# m = mp.Manager()out_queue = mp.Queue()def flop_no(rand_nos, a, b):校准 = []对于 rand_nos 中的 r:cals.append(r + a * b)返回校准def flop(val, a, b, out_queue):校准 = []对于 val 中的 v:cals.append(v + a * b)# 打印calsout_queue.put(cals)# 打印执行结束"定义并发():# out_queue1 = mp.Queue()# out_queue2 = mp.Queue()一 = 3.3b = 4.4rand_nos = [uniform(1, 4) for i in range(1000000)]打印 len(rand_nos)# for i in range(5):start_time = time.time()p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))p1.start()out_queue.get()# 打印 "
Final:", len(out_queue.get())p2.start()out_queue.get()# 打印 "
Final:", len(out_queue.get())p3.start()out_queue.get()p4.start()out_queue.get()p1.join()p2.join()p3.join()p4.join()打印并行运行时间:",time.time() - start_time,秒"def no_concurrency():一 = 3.3b = 4.4rand_nos = [uniform(1, 4) for i in range(1000000)]start_time = time.time()cals = flop_no(rand_nos, a, b)打印运行时间序列:",time.time() - start_time,秒"如果 __name__ == '__main__':并发()no_concurrency()# 打印程序结束"

我的系统有四个内核.请让我知道我可以加速此代码的方法.另外,我使用 python 进行并行编程的选项有哪些(多处理模块除外).

感谢和问候

解决方案

爱是一种激情 ...但会伤害很多, 如果一个人的信念只是盲目或天真

我喜欢 Python,因为它的易用性和通用性,但要实现 HPC 性能需要更多,还需要投入更多与硬件相关的洞察力和优化调整工作.

<块引用>

@RupjitChakraborty,正如您在下面的回答中可能会喜欢的那样,可以在 pure-[SERIAL]-code ~50x 更快收到相同的结果 比您的最佳情况并且比 Mark 报告的时间快约 100 倍.随意在您的硬件上重新测试它,以便有一个相同的平台来对性能读数进行更严格的比较.尽管如此,享受对性能的追求! – user3666197 2017 年 12 月 1 日 13:39

如果我可以在这个永无止境的对性能的追求中投入几分钱:
- 尽量理解原始的阿姆达尔定律及其新的开销严格重新制定
- 尝试很好地量化出现在流程管理
的附加开销成本- 尝试很好地量化与大型数据传输相关的附加开销成本(一站式成本)
- 尽量避免任何和所有潜在的 (b)锁定,有些可能隐藏在使用过的构造函数的后面"
- 尽量避免同步 + 通信
的任何与处理无关的开销成本- 尽量防止任何 CPU_core 缓存未命中,并尽量减少一致性损失(是的,说起来容易,很难编码 - 即手动编写的代码通常比简单的代码更好one-liner,使用一些高度抽象的语法构造函数(但代价是无法管理),因为您可以在您的控制下在缓存相关决策中采取更好的步骤,而不是依靠某些上下文不知道的预制来执行此操作通用(即与​​您的特定优先级无关)代码转换)

<小时>

想要加速?
始终系统地单独测试各个因素:

简要介绍您的代码将支付的实际成本(在 [us] 中)不要猜测,测试一下.

测试用例 A:衡量流程管理[SERIAL]-流程调度附加成本
测试用例 B:测量远程进程内存分配附加成本
测试用例C:测量远程进程[CONCURRENT]-process-scheduling计算成本
测试用例 D:衡量远程进程工作负载对[CONCURRENT]调度成本的影响

欲知详情,
可以进一步阅读并重新使用/改进原始代码模板
[ 重要的架构、资源和流程调度事实 ]一章中.

正如马克已经警告的那样,开销严格的阿姆达尔定律加速计算的另一个成本将来自从主进程到每个衍生子进程的数据传输,其中纯[SERIAL]由于访问模式冲突、资源物理容量争用、共享对象信号化(b)锁定开销以及类似的硬件无法避免的障碍,附加开销将并且确实增长超过数据量的线性扩展.

在深入研究性能调整选项之前,可以提出一个简单的测试用例 E:来衡量这类内存数据传输附加成本:

def a_FAT_DATA_XFER_COSTS_FUN( anIndeedFatPieceOfDATA ):""" __doc__这个 FUN() 的意图确实是什么都不做,但为了能够进行基准测试附加管理费用因需要转移而提出一些大量的数据来自 main() 进程到这个 FUN() 子进程产生."""返回 ( anIndeedFatPieceOfDATA[ 0]+ anIndeedFatPieceOfDATA[-1])############################################################### 一个天真的测试台############################################################从 zmq 导入秒表;aClk = 秒表()JOBS_TO_SPAWN = 4 # TUNE: 1, 2, 4, 5, 10, ..RUNS_TO_RUN = 10 # TUNE: 10, 20, 50, 100, 200, 500, 1000, ..SIZE_TO_XFER = 1E+6 # 调整:+6、+7、+8、+9、+10、..DATA_TO_XFER = [ 1 for _ in range( int( SIZE_TO_XFER ) ) ]尝试:aClk.start()#-----------------------------------------------------<_CODE_UNDER_TEST_>joblib.Parallel(n_jobs = JOBS_TO_SPAWN)( joblib.delayed( a_FAT_DATA_XFER_COSTS_FUN )( a_FAT_DATA )对于 ( a_FAT_DATA )在 [ DATA_TO_XFER 中对于 _ 范围内( RUNS_TO_RUN )])#-----------------------------------------------------<_CODE_UNDER_TEST_>除了:经过最后:尝试:_ = aClk.stop()除了:_ = -1经过template = "CLK:: {0:_>24d} [us] @{1: >3d} run{2: >5d} RUNS ( {3: >12.3f}[MB]"打印(模板.格式(_,JOBS_TO_SPAWN,RUNS_TO_RUN,SIZE_TO_SEND/1024./1024.))

<小时>

请告诉我可以加速此代码的方法.

  • 了解numba,绝对值得了解这个提升性能的工具
  • 了解操作的矢量化
  • 在掌握了这两个之后,可能会考虑将已经完美的代码重新制定到 Cython 中
<小时>

rVEC = np.random.uniform( 1, 4, 1E+6 )def flop_NaivePY( r, a, b ):返回( r+(a *b ) )aClk.start();_ = flop_NaivePY( rVEC, a, b );aClk.stop()4868L4253L4113L4376L4333L4137L4.~_____[ms] @ 1.000.000 FLOAT-OPS,很酷,对吧?

然而,如果考虑到性能,这段代码是非常错误的.

让我们开启 numpy 就地分配,避免重复的内存分配和类似的处理效率低下:

def flop_InplaceNUMPY( r, a, b ):r += a * b返回aClk.start();_ = flop_InplaceNUMPY( rVEC, a, b );aClk.stop()2459L2426L2658L2444L2421L2430L2429L4.??@ 1.000.000 浮动操作,很酷,对吧?不像现在看到的2.~!____[ms] @ 1.000.000 FLOAT-OPS,一半,更好!但还要测试缩放一旦离开缓存,那个测试闻到了需要优化代码设计

谨慎的实验者很快就会证明,以后甚至可能会看到在原始代码运行期间杀死了 python 进程,因为不足的内存分配请求将被窒息和恐慌,以在超过 ~1E+9 的更大尺寸时终止 )

这一切都会带来否则纯[SERIAL]代码类固醇,但无需支付任何费用零附加成本和 Gene Amdahl 叔叔将奖励您在代码设计过程中最大的流程调度和硬件架构知识和努力.

没有更好的建议 ...除了进入一个纯粹的千里眼业务,在那里重新测试是不可能的

I was trying out the Python multiprocessing module. In the code below the serial execution time 0.09 seconds and the parallel execution time is 0.2 seconds. As I am getting no speedup, I think I might be going wrong somewhere

import multiprocessing as mp
from random import uniform, randrange
import time

# m = mp.Manager()
out_queue = mp.Queue()

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals


def flop(val, a, b, out_queue):
    cals = []
    for v in val:
        cals.append(v + a * b)
    # print cals
    out_queue.put(cals)
    # print "Exec over"


def concurrency():
    # out_queue1 = mp.Queue()
    # out_queue2 = mp.Queue()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    start_time = time.time()
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
    p1.start()
    out_queue.get()
    # print "
Final:", len(out_queue.get())
    p2.start()
    out_queue.get()
    # print "
Final:", len(out_queue.get())
    p3.start()
    out_queue.get()

    p4.start()
    out_queue.get()

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print "Running time parallel: ", time.time() - start_time, "secs"

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

My system has four cores. Please let me know of ways I can speedup this code. Also, what are my options for parallel programming with python(other than the multiprocessing module).

Thanks and Regards

解决方案

Love is a passion . . . but can hurt a lot, if one's belief is just blind or naive to evidence

I love python for its ease of use, for its universality, yet, getting towards HPC performance requires more, hardware-related insights and optimisation-tweaking efforts are needed to be also put in.

@RupjitChakraborty as you might enjoy in my answer below, the same result could be received in a pure-[SERIAL]-code ~50x faster than in your best case and about ~100x faster than Mark's reported time. Feel free to re-test it on your hardware, so as to have a same platform for a bit more rigorous comparisons of performance readings. Nevertheless, enjoy the hunt for performance! – user3666197 Dec 1 '17 at 13:39

If I may put a few cents into this never-ending hunt for performance:
- try to well understand both the original Amdahl's Law + its new overhead-strict re-formulation
- try to well quantify the costs of add-on overheads that appear on process-management
- try to well quantify the costs of add-on overheads that relate to large data transfers ( one-stop cost )
- try to avoid any and all potential (b)locking, some might be hidden "behind" used constructors
- try to avoid any processing-unrelated overhead costs of synchronisation + communication
- try to prevent any CPU_core cache misses and also best minimise coherence losses ( yes, easy to say, hard to code - i.e. a manually crafted code often gets better than a simple one-liner, using some highly-abstracted syntax-constructor ( but at a cost one cannot manage ), as you can take better steps in cache-related decision under your control, than to rely on doing this by some context unaware pre-fabricated universal ( i.e. unrelated to your particular priorities ) code transformation )


Want speedup?
Always systematically test individual factors in isolation:

As a brief view into the actual costs your code will pay ( in [us] ) never guess, test it.

Test-case A: measures process-management [SERIAL]-process-scheduling add-on costs
Test-case B: measures remote process memory allocation add-on costs
Test-case C: measures remote process [CONCURRENT]-process-scheduling computing costs
Test-case D: measures remote process workloads impact on [CONCURRENT] scheduling costs

For details,
one may read further and re-use / improve naive code templates
in chapter [ The Architecture, Resources and Process-scheduling facts that matter ].

As Mark has warned already, another costs to the overhead-strict Amdahl's Law speedup calculation will come from data-transfers from the main process towards each of the spawned subprocesses, where pure-[SERIAL] add-on overheads will and do grow more than linearly scaled to data volume, due to colliding access patterns, resource physical-capacity contention, shared-objects signallisation-(b)locking-overheads, and similar, hardware un-avoidable obstacles.

Before going any deeper into performance-tweaking options, one may propose an easy Test-case E: for measuring this very class of memory-data-transfers add-on costs:

def a_FAT_DATA_XFER_COSTS_FUN( anIndeedFatPieceOfDATA ):
    """                                                 __doc__
    The intent of this FUN() is indeed to do nothing at all,
                             but to be able to benchmark
                             add-on overhead costs
                             raised by a need to transfer
                             some large amount of data
                             from a main()-process
                             to this FUN()-subprocess spawned.
    """
    return ( anIndeedFatPieceOfDATA[ 0]
           + anIndeedFatPieceOfDATA[-1]
             )

##############################################################
###  A NAIVE TEST BENCH
##############################################################
from zmq import Stopwatch; aClk = Stopwatch()
JOBS_TO_SPAWN =  4         # TUNE:  1,  2,  4,   5,  10, ..
RUNS_TO_RUN   = 10         # TUNE: 10, 20, 50, 100, 200, 500, 1000, ..
SIZE_TO_XFER  = 1E+6       # TUNE: +6, +7, +8,  +9, +10, ..

DATA_TO_XFER  = [ 1 for _ in range( int( SIZE_TO_XFER ) ) ]

try:
     aClk.start()
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
     joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                      )( joblib.delayed( a_FAT_DATA_XFER_COSTS_FUN )
                                       ( a_FAT_DATA )
                                   for ( a_FAT_DATA )
                                   in  [       DATA_TO_XFER
                                         for _ in range( RUNS_TO_RUN )
                                         ]
                         )
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
except:
     pass
finally:
     try:
         _ = aClk.stop()
     except:
         _ = -1
         pass

template = "CLK:: {0:_>24d} [us] @{1: >3d} run{2: >5d} RUNS ( {3: >12.3f}[MB]"

print( template.format( _,
                        JOBS_TO_SPAWN,
                        RUNS_TO_RUN,
                        SIZE_TO_SEND / 1024. /1024.
                        )
       )


Please let me know of ways I can speedup this code.

  • learn about numba, definitely worth knowing this tool for performance boosting
  • learn about vectorisation of operations
  • after mastering these two, might look into re-formulating an already perfect code into Cython

rVEC = np.random.uniform( 1, 4, 1E+6 )

def flop_NaivePY( r, a, b ):
    return(       r+(a *b ) )

aClk.start(); _ = flop_NaivePY( rVEC, a, b ); aClk.stop()
4868L
4253L
4113L
4376L
4333L
4137L
4.~_____[ms] @ 1.000.000 FLOAT-OPS, COOL, RIGHT?

Yet, this code is awfully wrong if thinking about performance.

Let's turn on numpy in-place assignments, avoiding duplicate memory allocations and similar processing-inefficiencies:

def flop_InplaceNUMPY( r, a, b ):
       r += a * b
       return r

aClk.start(); _ = flop_InplaceNUMPY( rVEC, a, b ); aClk.stop()
2459L
2426L
2658L
2444L
2421L
2430L
2429L
4.??         @ 1.000.000 FLOAT-OPS, COOL, RIGHT? NOT AS SEEN NOW
2.~!____[ms] @ 1.000.000 FLOAT-OPS, HALF, BETTER!
                                          BUT
                                          ALSO TEST THE SCALING
                                          ONCE GONE OFF CACHE,
                                          THAT TEST GET SMELL OF A NEED
                                                              TO OPTIMISE
                                                              CODE DESIGN

Cautious experimentators will soon exhibit that later might be seen even killed python-process during the naive-code runs, as insufficient memory allocation request will get suffocated and panicked to terminate on larger sizes above ~1E+9 )

this all will bring otherwise pure-[SERIAL] code on steroids, yet without paying any but zero add-on costs and uncle Gene Amdahl will reward your process-scheduling and hardware-architecture knowledge and efforts spent during code-design on max.

No better advice exists . . . except going into a pure clairvoyance business, where re-testing is never available

这篇关于为什么python多处理比串行代码花费更多时间?如何加速?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆