Python多处理性能 [英] Python multiprocessing performance

查看:133
本文介绍了Python多处理性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这应该是关于我为提高对python所做的某些统计分析的性能而进行的尝试的第三个也是最后一个问题.我有2个版本的代码(单核与多处理),我希望通过使用多个核来提高性能,因为我希望我的代码能够解压缩/解压缩相当多的二进制字符串,可悲的是,我注意到通过使用多个核,性能实际上降低了核心.

This should be my third and final question regarding my attempts to increase performance on some statistical analysis that I am doing with python. I have 2 versions of my code (single core vs multiprocessing), I was expecting to gain performance by using multiple cores as I expect my code to uncompress/unpack quite a few binary strings , sadly I noticed that the performance actually decreased by using multiple cores.

我想知道是否有人对我观察到的情况有可能的解释(向下滚动至4月16日更新以获取更多信息)?

I am wondering if anyone has a possible explanation for what I observe (scroll down to the April 16th update for more information)?

程序的关键部分是函数numpy_array(在多处理中为+解码),下面的代码段(可通过pastebin访问完整代码,下面进一步介绍):

The key part of program is the function numpy_array (+ decode in multiprocessing), code snippet below (full code accessible via pastebin, further below):

def numpy_array(data, peaks):
    rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

多处理版本通过一组功能执行此操作,我将在下面显示键2:

The multiprocessing version performs this with a set of functions, I will display the key 2 below:

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def numpy_array(shared_arr,peaks):
    processors=mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size=int(len(peaks)/processors)
        map_parameters=[]
        for i in range(processors):
            counter = i*chunk_size
            chunk=peaks[i*chunk_size:(i+1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode,map_parameters)

def decode ((chunk, counter)):
    data=tonumpyarray(shared_arr).view(
        [('f0','<f4'), ('f1','<f4',(250000,2))])
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            #with shared_arr.get_lock():
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

可以通过这些pastebin链接访问完整的程序代码

Full program codes can be accessed via these pastebin links

用于单核版本的Pastebin

用于多处理版本的垃圾箱

我使用一个包含239个时间点和每个时间点约18万个测量对的文件观察到的性能,对于单核来说是〜2.5m,对于多处理来说是〜3.5.

The performance that I am observing with a file containing 239 timepoints and ~ 180k measurement pairs per timepoint is ~2.5m for single core and ~3.5 for multiprocessing.

PS :(我第一次尝试并行化的)前两个问题:

PS: The two previous questions (of my first ever attempts at paralellization):

  1. Python多处理
  2. 使我的NumPy数组跨进程共享
  1. Python multi-processing
  2. Making my NumPy array shared across processes

-4月16日-

我一直在使用cProfile库对程序进行性能分析(在__main__中具有cProfile.run('main()'),这表明有1个步骤正在减慢所有操作的速度:

I have been profiling my program with the cProfile library (having cProfile.run('main()') in the __main__, which shows that there is 1 step that is slowing everything down:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
23   85.859    3.733   85.859    3.733 {method 'acquire' of 'thread.lock' objects}

我在这里不了解的是,threading中使用了thread.lock对象(据我的理解),但不应在多处理中使用,因为每个内核都应运行一个线程(除了具有自己的锁定机制外) ),那么这是怎么发生的?为什么一个电话要花3.7秒?

The thing that I do not understand here is that thread.lock objects are used in threading (to my understanding) but should not be used in multiprocessing as each core should run a single thread (besides having it's own locking mechanism), so how is it that this occurs and why does a single call take 3.7 seconds?

推荐答案

共享数据是已知由于同步而变慢的情况.

Shared data is a known case of slowdowns due to synchronization.

您可以在进程之间分配数据,还是给每个进程一个独立的副本?这样,直到完成所有计算为止,您的过程才需要同步任何东西.

Can you split your data among processes, or give each process an independent copy? Then your processes would not need to synchronize anything up until the moment when all calculations are done.

然后,我让主进程将所有辅助处理器的输出合并到一个一致的集合中.

Then I'd let the master process join the output of all worker processors into one coherent set.

该方法可能需要额外的RAM,但如今RAM很便宜.

The approach may take extra RAM, but RAM is cheap nowadays.

如果您询问,每次获取线程锁3700 ms也会令我感到困惑.像这样的特殊调用可能会误认为OTOH分析.

If you ask, I'm also puzzled by 3700 ms per thread lock acquisition. OTOH profiling may be mistaken about special calls like this.

这篇关于Python多处理性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆