同步访问时在共享内存中使用numpy数组速度较慢 [英] Using numpy array in shared memory slow when synchronizing access
问题描述
我编写了一个程序,该程序接收大数据集作为输入(约150mb文本文件),对它们进行一些数学运算,然后以直方图形式报告结果.必须执行的计算数量与数据集中两点的组合数量成正比,对于一百万个点的数据集来说,这是非常大的(约50亿个).
I have written a program which receives large-ish data sets as input (~150mb text files), does some math on them, and then reports the results in a histogram. The number of calculations that must be performed is proportional to the number of combinations of two points in the data set, which is extremely large (~5 billion) for a data set of 1 million points.
我希望通过使用Python的multiprocessing
模块将部分直方图数据的计算分配给各个进程来减少一些计算时间,同时将最终直方图的数组保留在共享内存中,以便每个进程都可以添加到其中
I was hoping to mitigate some of the computation time by using Python's multiprocessing
module to distribute the calculation of partial histogram data to individual processes while keeping an array of the final histogram in shared memory so that each process can add to it.
我通常根据此答案中所述的步骤,使用multiprocessing
创建了该程序的工作版本,但是我发现它实际上比我以前编写的无并行版本要慢一些.我尝试使对共享阵列的访问不同步,但发现这样做可以显着加快速度,但是会导致部分数据丢失.
I have created a working version of this program with multiprocessing
generally based on the procedure described in this answer, however I found that it is actually marginally slower than the un-parallelized version I had previously written. I tried un-synchronizing access to the shared array, and found that this speeds things up significantly, but results in a loss of a portion of the data.
这是代码的一般概述:
import numpy as np
from multiprocessing import Pool, Array
BINS = 200
DMAX = 3.5
DMIN = 0
def init(histo):
global histo_shared
histo_shared = histo
def to_np_array(mp_array):
return np.frombuffer(mp_array.get_obj())
# synchronize access to shared array
def calc_sync(i):
with histo_shared.get_lock():
calc_histo(i)
def calc_histo(i):
# create new array 'd_new' by doing some math on DATA using argument i
histo = to_np_array(histo_shared)
histo += np.histogram(d_new, bins=BINS,
range=(DMIN, DMAX))[0].astype(np.int32)
def main():
# read in data and calculate no. of iterations
global DATA
DATA = np.loadtxt("data.txt")
it = len(DATA) // 2
# create shared array
histo_shared = Array('l', BINS)
# write to shared array from different processes
p = Pool(initializer=init, initargs=(histo_shared,))
for i in range(1, it + 1):
p.apply_async(calc_sync, [i])
p.close()
p.join()
histo_final = to_np_array(histo_shared)
np.savetxt("histo.txt", histo_final)
if __name__ == '__main__':
main()
我在这里是否缺少对我的表现有严重影响的东西?我有什么办法可以解决这个问题,以加快处理速度?
Is there something I'm missing here that's having a serious impact on my performance? Is there any way I can get around this issue to speed things up?
任何见解或建议都将不胜感激!
Any insights or suggestions are greatly appreciated!
推荐答案
本质上,您正在锁定可能获得的任何并行性,因为在整个处理过程中,您的数据都处于锁定状态.
You are essentially locking out any parallelism you might be getting because there is a lock on your data the entire time you are processing.
使用此方法时
def calc_sync(i):
with histo_shared.get_lock():
calc_histo(i)
正在执行,您在处理直方图时在整个共享数据集上设置了锁定.还要注意
is executing, you placed a lock on the entire shared dataset while you're processing the histogram. Notice also that
def calc_histo(i):
# create new array 'd_new' by doing some math on DATA using argument i
histo = to_np_array(histo_shared)
histo += np.histogram(d_new, bins=BINS,
range=(DMIN, DMAX))[0].astype(np.int32)
对i不做任何事情,因此看起来就像您在重新处理相同的数据.什么是d_new?我没有在您的清单中看到它.
isn't doing anything with i, so it just looks like you're processing the same data over again. What is d_new? I don't see it in your listing.
理想情况下,您应该做的是获取大型数据集,将其切成一定数量的块,然后分别进行处理,然后组合结果.仅锁定共享数据,不锁定处理步骤.这可能看起来像这样:
Ideally, what you should be doing is taking your large dataset, slicing it in to some number of chunks and processing it individually and then combining the results. Only lock the shared data, not the processing steps. This might look something like this:
def calc_histo(slice):
# process the slice asyncronously
return np.histogram(slice, bins=BINS,
range=(DMIN, DMAX))[0].astype(np.int32)
def calc_sync(start,stop):
histo = None
# grab a chunk of data, you likely don't need to lock this
histo = raw_data[start:stop]
# acutal calculation is async
result = calc_histo(histo)
with histo_shared.get_lock():
histo_shared += result
对于成对数据:
def calc_sync(part1,part2):
histo = None
output = [] # or numpy array
# acutal calculation is async
for i in range(part1):
for j in range(part2):
# do whatever computation you need and add it to output
result = calc_histo(output)
with histo_shared.get_lock():
histo_shared += result
现在
p = Pool(initializer=init, initargs=(histo_shared,))
for i in range(1, it + 1,slice_size):
for j in range(1, it + 1,slice_size):
p.apply_async(calc_sync, [histo_shared[j:j+slice_size], histo_shared[i:i+slice_size])
换句话说,我们对数据进行成对切割,生成相关数据,然后将它们放在直方图中.您唯一需要的真正同步是在合并直方图中的数据时
In words, we take pairwise cuts of the data, generate the relevant data and then put them in a histogram. The only real synch you need is when you're combining data in the histogram
这篇关于同步访问时在共享内存中使用numpy数组速度较慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!