使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码 [英] Faster code to calculate distance between points in numpy array with cyclic (periodic) boundary conditions

查看:116
本文介绍了使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道如何使用计算数组中点之间的欧几里得距离scipy.spatial.distance.cdist

类似于这个问题的答案:

这是我使用cdist为此开发的代码:

将 numpy 导入为 np从 scipy 导入空间n=5 # 二维框的大小(n X n 点)np.random.seed(1) # 使可重现a=np.random.uniform(size=(n,n))i=np.argwhere(a>-1) # 所有点,对于每个 loc 我们想要距离最近的 Jj=np.argwhere(a>0.85) # 查找距离的 J 个位置的集合.# 这将用于 KDtree 解决方案全局最大分布最大距离=2.0def dist_v1(i,j):距离=[]# 周期性边界需要 3x3 搜索.对于 [-n,0,n] 中的 xoff:对于 [-n,0,n] 中的 yoff:jo=j.copy()jo[:,0]-=xoffjo[:,1]-=yoffdist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1))dist=np.amin(np.stack(dist),0).reshape([n,n])返回(距离)

这有效,并产生例如:

print(dist_v1(i,j))[[1.41421356 1. 1.41421356 1.41421356 1.][2.23606798 2.1.41421356 1.1.41421356][2.2. 1. 0. 1. ][1.41421356 1. 1.41421356 1. 1. ][1.0. 1. 1. 0. ]]

零显然标记了 J 点,并且距离是正确的(此 EDIT 纠正了我之前的错误尝试).

请注意,如果您更改最后两行以堆叠原始距离,然后仅使用一个最小值,如下所示:

def dist_v2(i,j):距离=[]# 周期性边界需要 3x3 搜索.对于 [-n,0,n] 中的 xoff:对于 [-n,0,n] 中的 yoff:jo=j.copy()jo[:,0]-=xoffjo[:,1]-=yoffdist.append(spatial.distance.cdist(i,jo,metric='euclidean'))dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])返回(距离)

对于较小的 n (<10) 来说更快,但对于较大的数组 (n>10) 来说

...但无论哪种方式,对于我的大型数组(N=500 和 J 点数约为 70)来说,它,此搜索占用了大约 99% 的计算时间,(使用循环也有点难看) - 有更好/更快的方法吗?

我想到的其他选项是:

  1. scipy.spatial.KDTree.query_ball_point

进一步搜索我发现有一个函数scipy.spatial.KDTree.query_ball_point 它直接计算我的 J 点半径内的坐标,但它似乎没有任何使用周期性边界的设施,所以我认为仍然需要以某种方式使用 3x3 循环,堆叠然后使用 amin 作为我在上面做,所以我不确定这是否会更快.

我使用这个函数编写了一个解决方案,而不用担心周期性边界条件(即这不能回答我的问题)

def dist_v3(n,j):x, y = np.mgrid[0:n, 0:n]点数 = np.c_[x.ravel(), y.ravel()]树=空间.KDTree(点)掩码=np.zeros([n,n])对于 tree.query_ball_point((j), maxdist) 中的结果:掩码[点数[结果][:,0],点数[结果][:,1]]=1返回(掩码)

也许我没有以最有效的方式使用它,但是即使没有周期性边界,这已经和我基于 cdist 的解决方案一样慢.在两个 cdist 解决方案中包含掩码功能,即将那些中的 return(dist) 替换为 return(np.where(dist<=maxdist,1,0))函数,然后使用 timeit,我得到以下 n=100 的时间:

from timeit import timeit打印(cdist v1:",timeit(lambda: dist_v1(i,j), number=3)*100)打印(cdist v2:",timeit(lambda: dist_v2(i,j), number=3)*100)打印(KDtree:",时间(lambda:dist_v3(n,j),数字= 3)* 100)cdist v1:181.80927299981704cdist v2:554.8205785999016KDtree:605.119637199823

  1. 为设置距离 [0,0] 内的点制作一个相对坐标数组,然后手动循环 J 点,使用此相对点列表设置掩码 - 这具有 "相对距离"计算只执行一次(我的 J 点每个时间步都会改变),但我怀疑循环会很慢.

  2. 为 2D 域中的每个点预先计算一组掩码,因此在模型集成的每个时间步长中,我只需选择 J 点的掩码并应用.这将使用大量内存(与 n^4 成正比)并且可能仍然很慢,因为您需要遍历 J 个点以组合掩码.

解决方案

- 我发现代码跟踪作业完成点的方式存在错误,用 mask_kernel<修复了它/代码>.较新代码的纯 python 版本慢约 1.5 倍,但 numba 版本稍快(由于一些其他优化).

[当前最佳速度:原始速度的 100 倍到 120 倍]

首先感谢您提交这个问题,我在优化它的过程中获得了很多乐趣!

我目前的最佳解决方案依赖于这样的假设:网格是规则的,并且源"是正常的.点(我们需要计算距离的点)大致均匀分布.

这里的想法是所有距离都将是 1、sqrt(2)sqrt(3),...所以我们可以这样做事先进行数值计算.然后我们简单地将这些值放在一个矩阵中,并在每个源点周围复制该矩阵(并确保保留在每个点找到的最小值).这涵盖了绝大多数点(>99%).然后我们应用另一个更经典"的剩余 1% 的方法.

代码如下:

将 numpy 导入为 npdef sq_distance(x1, y1, x2, y2, n):# 计算两组点之间的成对平方距离(具有周期性)# x1, y1 : 第一组点的坐标(来源)# x2, y2 : 相同dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)d = (dx*dx + dy*dy)返回def apply_kernel(sources, sqdist, kern_size, n, mask):ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing=ij")内核 = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)mask_kernel = 内核 >字距大小**2对于 pi, pj 来源:ind_i = (pi+ker_i)%nind_j = (pj+ker_j)%nsqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])mask[ind_i,ind_j] *= mask_kerneldef dist_vf(sources, n, kernel_size):sources = np.asfortranarray(sources) #内存连续性kernel_size = min(kernel_size, n//2)内核大小 = 最大值(内核大小,1)sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a big distance (>max**2)mask = np.ones((n,n), dtype=bool) #哪些点没有达到?#主代码apply_kernel(sources, sqdist, kernel_size, n, mask)#剩余积分rem_i, rem_j = np.nonzero(mask)如果 len(rem_i) >0:sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)sqdist[rem_i, rem_j] = sq_d#eff = 1-rem_i.size/n**2#print("covered by kernel:", 100*eff, "%")#print("overlap :", sources.shape[0]*(1+2*kernel_size)**2/n**2)#打印()返回 np.sqrt(sqdist)

测试这个版本

n=500 # 二维框的大小(n X n 点)np.random.seed(1) # 使可重现a=np.random.uniform(size=(n,n))all_points=np.argwhere(a>-1) # 所有点,对于每个位置我们想要距离最近的 Jsource_points=np.argwhere(a>1-70/n**2) # 查找距离的 J 个位置集.## dist_v1 和 dist_vf 的代码#重叠=5.2kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)打印(cdist v1:",timeit(lambda:dist_v1(all_points,source_points),数量= 1)* 1000,毫秒")打印(内核版本:",timeit(lambda:dist_vf(source_points,n,kernel_size),数量= 10)* 100,毫秒")

给予

cdist v1:1148.6694 毫秒内核版本:69.21876999999998 毫秒

这已经是大约 17 倍的加速!我还实现了 sq_distanceapply_kernel 的 numba 版本:[这是新的正确版本]

@njit(cache=True)def sq_distance(x1, y1, x2, y2, n):m1 = x1.sizem2 = x2.sizen2 = n//2d = np.empty((m1,m2), dtype=np.int32)对于范围内的 i(m1):对于范围内的 j(m2):dx = np.abs(x1[i] - x2[j] + n2)%n - n2dy = np.abs(y1[i] - y2[j] + n2)%n - n2d[i,j] = (dx*dx + dy*dy)返回@njit(缓存=真)def apply_kernel(sources, sqdist, kern_size, n, mask):# 创建内核内核 = np.empty((2*kern_size+1, 2*kern_size+1))vals = np.arange(-kern_size, kern_size+1)**2对于范围内的 i(2*kern_size+1):对于范围内的 j(2*kern_size+1):内核[i,j] = vals[i] + vals[j]mask_kernel = 内核 >字距大小**2I = 来源[:,0]J = 来源[:,1]# 为每个点应用内核对于范围内的 l(sources.shape[0]):π = I[l]pj = J[l]if pj - kern_size >= 0 and pj + kern_size<n: #如果我们在中间,不需要对j做模对于范围内的 i(2*kern_size+1):ind_i = np.mod((pi+i-kern_size), n)对于范围内的 j(2*kern_size+1):ind_j = (pj+j-kern_size)sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])mask[ind_i,ind_j] = mask_kernel[i,j] 和 mask[ind_i,ind_j]别的:对于范围内的 i(2*kern_size+1):ind_i = np.mod((pi+i-kern_size), n)对于范围内的 j(2*kern_size+1):ind_j = np.mod((pj+j-kern_size), n)sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])mask[ind_i,ind_j] = mask_kernel[i,j] 和 mask[ind_i,ind_j]返回

和测试

overlap=5.2kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)打印(cdist v1:",timeit(lambda:dist_v1(all_points,source_points),数量= 1)* 1000,毫秒")打印(内核numba(第一次运行):",timeit(lambda:dist_vf(source_points,n,kernel_size),数字=1)*1000,ms")#first run = cimpilation = long打印(内核数:",时间(lambda:dist_vf(源点,n,内核大小),数字= 10)* 100,毫秒")

得到以下结果

cdist v1:1163.0742 毫秒内核 numba(第一次运行):2060.0802 毫秒内核数字:8.80377000000001 毫秒

由于 JIT 编译,第一次运行速度很慢,但除此之外,它的性能提高了 120 倍!

通过调整kernel_size 参数(或overlap),可能会更好地利用该算法.kernel_size 目前的选择只对少量的源点有效.例如,这个选择在 source_points=np.argwhere(a>0.85)(13 秒)时惨遭失败,而手动设置 kernel_size=5 在 22 毫秒内给出答案.

我希望我的帖子不要(不必要地)太复杂,我真的不知道如何更好地组织它.

:

我对代码的非 numba 部分给予了更多的关注,并设法获得了非常显着的加速,非常接近 numba 可以实现的目标:这是函数的新版本 apply_kernel:

def apply_kernel(sources, sqdist, kern_size, n, mask):ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))内核 = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)mask_kernel = 内核 >字距大小**2对于 pi, pj 来源:imin = pi-kern_sizejmin = pj-kern_sizeimax = pi+kern_size+1jmax = pj+kern_size+1如果 imax =0:ind_i = (pi+ker_i.ravel())%nsqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])掩码[ind_i,jmin:jmax] *= mask_kernel别的 :ind_i = (pi+ker_i)%nind_j = (pj+ker_j)%nsqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])mask[ind_i,ind_j] *= mask_kernel

主要优化是

  • 用切片索引(而不是密集数组)
  • 使用稀疏索引(我之前怎么没有想到这一点)

测试

overlap=5.4kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)打印(cdist v1:",timeit(lambda:dist_v1(all_points,source_points),数量= 1)* 1000,毫秒")打印(内核 v2:",时间(lambda:dist_vf(源点,n,内核大小),数量= 10)* 100,毫秒")

给予

cdist v1 : 1209.8163000000002 ms内核 v2:11.319049999999997 毫秒

这比 cdist 提高了 100 倍,比之前仅使用 numpy 的版本提高了约 5.5 倍,仅比我使用 numba 实现的速度慢约 25%.

I know how to calculate the Euclidean distance between points in an array using scipy.spatial.distance.cdist

Similar to answers to this question: Calculate Distances Between One Point in Matrix From All Other Points

However, I would like to make the calculation assuming cyclic boundary conditions, e.g. so that point [0,0] is distance 1 from point [0,n-1] in this case, not a distance of n-1. (I will then make a mask for all points within a threshold distance of my target cells, but that is not central to the question).

The only way I can think of is to repeat the calculation 9 times, with the domain indices having n added/subtracted in the x, y and then x&y directions, and then stacking the results and finding the minimum across the 9 slices. To illustrate the need for 9 repetitions, I put together a simple schematic with just 1 J-point, marked with a circle, and which shows an example where the cell marked by the triangle in this case has its nearest neighbour in the domain reflected to the top-left.

this is the code I developed for this using cdist:

import numpy as np
from scipy import spatial
    
n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
i=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
j=np.argwhere(a>0.85) # set of J locations to find distance to.

# this will be used in the KDtree soln 
global maxdist
maxdist=2.0

def dist_v1(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:
            jo=j.copy()
            jo[:,0]-=xoff
            jo[:,1]-=yoff
            dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1)) 
    dist=np.amin(np.stack(dist),0).reshape([n,n])
    return(dist)

This works, and produces e.g. :

print(dist_v1(i,j))


[[1.41421356 1.         1.41421356 1.41421356 1.        ]
 [2.23606798 2.         1.41421356 1.         1.41421356]
 [2.         2.         1.         0.         1.        ]
 [1.41421356 1.         1.41421356 1.         1.        ]
 [1.         0.         1.         1.         0.        ]]

The zeros obviously mark the J points, and the distances are correct (this EDIT corrects my earlier attempts which was incorrect).

Note that if you change the last two lines to stack the raw distances and then only use one minimum like this :

def dist_v2(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:
            jo=j.copy()
            jo[:,0]-=xoff
            jo[:,1]-=yoff
            dist.append(spatial.distance.cdist(i,jo,metric='euclidean')) 
    dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
    return(dist)

it is faster for small n (<10) but considerably slower for larger arrays (n>10)

...but either way, it is slow for my large arrays (N=500 and J points number around 70), this search is taking up about 99% of the calculation time, (and it is a bit ugly too using the loops) - is there a better/faster way?

The other options I thought of were:

  1. scipy.spatial.KDTree.query_ball_point

With further searching I have found that there is a function scipy.spatial.KDTree.query_ball_point which directly calculates the coordinates within a radius of my J-points, but it doesn't seem to have any facility to use periodic boundaries, so I presume one would still need to somehow use a 3x3 loop, stack and then use amin as I do above, so I'm not sure if this will be any faster.

I coded up a solution using this function WITHOUT worrying about the periodic boundary conditions (i.e. this doesn't answer my question)

def dist_v3(n,j):
    x, y = np.mgrid[0:n, 0:n]
    points = np.c_[x.ravel(), y.ravel()]
    tree=spatial.KDTree(points)
    mask=np.zeros([n,n])
    for results in tree.query_ball_point((j), maxdist):
        mask[points[results][:,0],points[results][:,1]]=1
    return(mask)

Maybe I'm not using it in the most efficient way, but this is already as slow as my cdist-based solutions even without the periodic boundaries. Including the mask function in the two cdist solutions, i.e. replacing the return(dist) with return(np.where(dist<=maxdist,1,0)) in those functions, and then using timeit, I get the following timings for n=100:

from timeit import timeit

print("cdist v1:",timeit(lambda: dist_v1(i,j), number=3)*100)
print("cdist v2:",timeit(lambda: dist_v2(i,j), number=3)*100)
print("KDtree:", timeit(lambda: dist_v3(n,j), number=3)*100)

cdist v1: 181.80927299981704
cdist v2: 554.8205785999016
KDtree: 605.119637199823

  1. Make an array of relative coordinates for points within a set distance of [0,0] and then manually loop over the J points setting up a mask with this list of relative points - This has the advantage that the "relative distance" calculation is only performed once (my J points change each timestep), but I suspect the looping will be very slow.

  2. Precalculate a set of masks for EVERY point in the 2D domain, so in each timestep of the model integration I just pick out the mask for the J-point and apply. This would use a LOT of memory (proportional to n^4) and perhaps is still slow as you need to loop over J points to combine the masks.

解决方案

[EDIT] - I found a mistake in the way the code keeps track of the points where the job is done, fixed it with the mask_kernel. The pure python version of the newer code is ~1.5 times slower, but the numba version is slightly faster (due to some other optimisations).

[current best : ~100xto 120x the original speed]

First of all, thank you for submitting this problem, I had a lot of fun optimizing it!

My current best solution relies on the assumption that the grid is regular and that the "source" points (the ones from which we need to compute the distance) are roughly evenly distributed.

The idea here is that all of the distances are going to be either 1, sqrt(2), sqrt(3), ... so we can do the numerical calculation beforehand. Then we simply put these values in a matrix and copy that matrix around each source point (and making sure to keep the minimum value found at each point). This covers the vast majority of the points (>99%). Then we apply another more "classical" method for the remaining 1%.

Here's the code:

import numpy as np

def sq_distance(x1, y1, x2, y2, n): 
    # computes the pairwise squared distance between 2 sets of points (with periodicity)
    # x1, y1 : coordinates of the first set of points (source)
    # x2, y2 : same
    dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
    dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
    d  = (dx*dx + dy*dy)
    return d

def apply_kernel(sources, sqdist, kern_size, n, mask):
    ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
    kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
    mask_kernel = kernel > kern_size**2

    for pi, pj in sources:
        ind_i = (pi+ker_i)%n
        ind_j = (pj+ker_j)%n
        sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
        mask[ind_i,ind_j] *= mask_kernel

def dist_vf(sources, n, kernel_size):
    sources = np.asfortranarray(sources) #for memory contiguity

    kernel_size = min(kernel_size, n//2)
    kernel_size = max(kernel_size, 1)

    sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
    mask   = np.ones((n,n), dtype=bool)              #which points have not been reached?

    #main code
    apply_kernel(sources, sqdist, kernel_size, n, mask) 

    #remaining points
    rem_i, rem_j = np.nonzero(mask)
    if len(rem_i) > 0:
        sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
        sqdist[rem_i, rem_j] = sq_d

    #eff = 1-rem_i.size/n**2
    #print("covered by kernel :", 100*eff, "%")
    #print("overlap :", sources.shape[0]*(1+2*kernel_size)**2/n**2)
    #print()

    return np.sqrt(sqdist)

Testing this version with

n=500  # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
all_points=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
source_points=np.argwhere(a>1-70/n**2) # set of J locations to find distance to.

#
# code for dist_v1 and dist_vf
#

overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)

print("cdist v1      :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel version:", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

gives

cdist v1      : 1148.6694 ms
kernel version: 69.21876999999998 ms

which is a already a ~17x speedup! I also implemented a numba version of sq_distance and apply_kernel: [this is the new correct version]

@njit(cache=True)
def sq_distance(x1, y1, x2, y2, n):
    m1 = x1.size
    m2 = x2.size
    n2 = n//2
    d = np.empty((m1,m2), dtype=np.int32)
    for i in range(m1):
        for j in range(m2):
            dx = np.abs(x1[i] - x2[j] + n2)%n - n2
            dy = np.abs(y1[i] - y2[j] + n2)%n - n2
            d[i,j]  = (dx*dx + dy*dy)
    return d

@njit(cache=True)
def apply_kernel(sources, sqdist, kern_size, n, mask):
    # creating the kernel
    kernel = np.empty((2*kern_size+1, 2*kern_size+1))
    vals = np.arange(-kern_size, kern_size+1)**2
    for i in range(2*kern_size+1):
        for j in range(2*kern_size+1):
            kernel[i,j] = vals[i] + vals[j]
    mask_kernel = kernel > kern_size**2

    I = sources[:,0]
    J = sources[:,1]

    # applying the kernel for each point
    for l in range(sources.shape[0]):
        pi = I[l]
        pj = J[l]

        if pj - kern_size >= 0 and pj + kern_size<n: #if we are in the middle, no need to do the modulo for j
            for i in range(2*kern_size+1):
                ind_i = np.mod((pi+i-kern_size), n)
                for j in range(2*kern_size+1):
                    ind_j = (pj+j-kern_size)
                    sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
                    mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]

        else:
            for i in range(2*kern_size+1):
                ind_i = np.mod((pi+i-kern_size), n)
                for j in range(2*kern_size+1):
                    ind_j = np.mod((pj+j-kern_size), n)
                    sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
                    mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
    return

and testing with

overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)

print("cdist v1                :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel numba (first run):", timeit(lambda: dist_vf(source_points, n, kernel_size), number=1)*1000, "ms") #first run = cimpilation = long
print("kernel numba            :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

which gave the following results

cdist v1                : 1163.0742 ms
kernel numba (first run): 2060.0802 ms
kernel numba            : 8.80377000000001 ms

Due to the JIT compilation, the first run is pretty slow but otherwise, it's a 120x improvement!

It may be possible to get a little bit more out of this algorithm by tweaking the kernel_size parameter (or the overlap). The current choice of kernel_size is only effective for a small number of source points. For example, this choice fails miserably with source_points=np.argwhere(a>0.85) (13s) while manually setting kernel_size=5 gives the answer in 22ms.

I hope my post isn't (unnecessarily) too complicated, I don't really know how to organise it better.

[EDIT 2]:

I gave a little more attention to the non-numba part of the code and managed to get a pretty significant speedup, getting very close to what numba could achieve: Here is the new version of the function apply_kernel:

def apply_kernel(sources, sqdist, kern_size, n, mask):
    ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
    ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))

    kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
    mask_kernel = kernel > kern_size**2

    for pi, pj in sources:

        imin = pi-kern_size
        jmin = pj-kern_size
        imax = pi+kern_size+1
        jmax = pj+kern_size+1
        if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
            sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
            mask[imin:imax,jmin:jmax] *= mask_kernel

        elif imax < n and imin >=0:
            ind_j = (pj+ker_j.ravel())%n
            sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
            mask[imin:imax,ind_j] *= mask_kernel

        elif jmax < n and jmin >=0:
            ind_i = (pi+ker_i.ravel())%n
            sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
            mask[ind_i,jmin:jmax] *= mask_kernel

        else :
            ind_i = (pi+ker_i)%n
            ind_j = (pj+ker_j)%n
            sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
            mask[ind_i,ind_j] *= mask_kernel

The main optimisations are

  • Indexing with slices (rather than a dense array)
  • Use of sparse indexes (how did I not think about that earlier)

Testing with

overlap=5.4
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)

print("cdist v1  :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel v2 :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

gives

cdist v1  : 1209.8163000000002 ms
kernel v2 : 11.319049999999997 ms

which is a nice 100x improvement over cdist, a ~5.5x improvement over the previous numpy-only version and just ~25% slower than what I could achieve with numba.

这篇关于使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆