同步访问时在共享内存中使用numpy数组速度较慢 [英] Using numpy array in shared memory slow when synchronizing access

查看:135
本文介绍了同步访问时在共享内存中使用numpy数组速度较慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个程序,该程序接收大数据集作为输入(约150mb文本文件),对它们进行一些数学运算,然后以直方图形式报告结果.必须执行的计算数量与数据集中两点的组合数量成正比,对于一百万个点的数据集来说,这是非常大的(约50亿个).

I have written a program which receives large-ish data sets as input (~150mb text files), does some math on them, and then reports the results in a histogram. The number of calculations that must be performed is proportional to the number of combinations of two points in the data set, which is extremely large (~5 billion) for a data set of 1 million points.

我希望通过使用Python的multiprocessing模块将部分直方图数据的计算分配给各个进程来减少一些计算时间,同时将最终直方图的数组保留在共享内存中,以便每个进程都可以添加到其中

I was hoping to mitigate some of the computation time by using Python's multiprocessing module to distribute the calculation of partial histogram data to individual processes while keeping an array of the final histogram in shared memory so that each process can add to it.

我通常根据此答案中所述的步骤,使用multiprocessing创建了该程序的工作版本,但是我发现它实际上比我以前编写的无并行版本要慢一些.我尝试使对共享阵列的访问不同步,但发现这样做可以显着加快速度,但是会导致部分数据丢失.

I have created a working version of this program with multiprocessing generally based on the procedure described in this answer, however I found that it is actually marginally slower than the un-parallelized version I had previously written. I tried un-synchronizing access to the shared array, and found that this speeds things up significantly, but results in a loss of a portion of the data.

这是代码的一般概述:

import numpy as np
from multiprocessing import Pool, Array

BINS = 200
DMAX = 3.5
DMIN = 0

def init(histo):
    global histo_shared
    histo_shared = histo

def to_np_array(mp_array):
    return np.frombuffer(mp_array.get_obj())

# synchronize access to shared array
def calc_sync(i):
    with histo_shared.get_lock():
        calc_histo(i)

def calc_histo(i):
    # create new array 'd_new' by doing some math on DATA using argument i
    histo = to_np_array(histo_shared)
    histo += np.histogram(d_new, bins=BINS,
        range=(DMIN, DMAX))[0].astype(np.int32)

def main():
    # read in data and calculate no. of iterations
    global DATA
    DATA = np.loadtxt("data.txt")
    it = len(DATA) // 2

    # create shared array 
    histo_shared = Array('l',  BINS)

    # write to shared array from different processes
    p = Pool(initializer=init, initargs=(histo_shared,))
        for i in range(1, it + 1):
            p.apply_async(calc_sync, [i])
    p.close()
    p.join()

    histo_final = to_np_array(histo_shared)
    np.savetxt("histo.txt", histo_final)

if __name__ == '__main__':
    main()

我在这里是否缺少对我的表现有严重影响的东西?我有什么办法可以解决这个问题,以加快处理速度?

Is there something I'm missing here that's having a serious impact on my performance? Is there any way I can get around this issue to speed things up?

任何见解或建议都将不胜感激!

Any insights or suggestions are greatly appreciated!

推荐答案

本质上,您正在锁定可能获得的任何并行性,因为在整个处理过程中,您的数据都处于锁定状态.

You are essentially locking out any parallelism you might be getting because there is a lock on your data the entire time you are processing.

使用此方法时

def calc_sync(i):
    with histo_shared.get_lock():
        calc_histo(i)

正在执行,您在处理直方图时在整个共享数据集上设置了锁定.还要注意

is executing, you placed a lock on the entire shared dataset while you're processing the histogram. Notice also that

def calc_histo(i):
    # create new array 'd_new' by doing some math on DATA using argument i
    histo = to_np_array(histo_shared)
    histo += np.histogram(d_new, bins=BINS,
        range=(DMIN, DMAX))[0].astype(np.int32)

对i不做任何事情,因此看起来就像您在重新处理相同的数据.什么是d_new?我没有在您的清单中看到它.

isn't doing anything with i, so it just looks like you're processing the same data over again. What is d_new? I don't see it in your listing.

理想情况下,您应该做的是获取大型数据集,将其切成一定数量的块,然后分别进行处理,然后组合结果.仅锁定共享数据,不锁定处理步骤.这可能看起来像这样:

Ideally, what you should be doing is taking your large dataset, slicing it in to some number of chunks and processing it individually and then combining the results. Only lock the shared data, not the processing steps. This might look something like this:

def calc_histo(slice):
    # process the slice asyncronously
    return np.histogram(slice, bins=BINS,
        range=(DMIN, DMAX))[0].astype(np.int32)

def calc_sync(start,stop):

    histo = None

    # grab a chunk of data, you likely don't need to lock this
    histo = raw_data[start:stop]

    # acutal calculation is async
    result = calc_histo(histo)

    with histo_shared.get_lock():
         histo_shared += result

对于成对数据:

def calc_sync(part1,part2):

    histo = None
    output = [] # or numpy array
    # acutal calculation is async
    for i in range(part1):
        for j in range(part2):
              # do whatever computation you need and add it to output

    result = calc_histo(output)

    with histo_shared.get_lock():
         histo_shared += result

现在

 p = Pool(initializer=init, initargs=(histo_shared,))
 for i in range(1, it + 1,slice_size):
     for j in range(1, it + 1,slice_size):
         p.apply_async(calc_sync, [histo_shared[j:j+slice_size], histo_shared[i:i+slice_size])

换句话说,我们对数据进行成对切割,生成相关数据,然后将它们放在直方图中.您唯一需要的真正同步是在合并直方图中的数据时

In words, we take pairwise cuts of the data, generate the relevant data and then put them in a histogram. The only real synch you need is when you're combining data in the histogram

这篇关于同步访问时在共享内存中使用numpy数组速度较慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆