pyspark计算稀疏向量的距离矩阵 [英] pyspark calculate distance matrix of sparse vectors
问题描述
我正在尝试构建一种通用方法来计算许多稀疏向量(长度为250k的100k个向量)的距离矩阵.在我的示例中,数据以Scipy CSR矩阵表示.这就是我正在做的:
I'm trying to build a generic way to calculate a distance matrix of many sparse vectors (100k vectors with a length of 250k). In my example the data is represented in a scipy csr matrix. This is what I'm doing:
首先,我要定义一种将csr行转换为pyspark SparseVectors的方法:
First I'm defining a method to transform the csr rows to pyspark SparseVectors:
def csr_to_sparse_vector(row):
return SparseVector(row.shape[1], sorted(row.indices), row.data)
现在,我将这些行转换为向量,并将其保存到列表中,然后将其提供给SparkContext:
Now I transform the rows into vectors and save them to a list which I then feed to the SparkContext:
sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample]
rdd = sc.parallelize(sparse_vectors)
In the next step I use the cartesian function to build all the pairs (similar to this post: Pyspark calculate custom distance between all vectors in a RDD)
在本实验中,我想使用相应定义的tje Jaccard相似度:
In this experiment I want to use tje Jaccard Similarity which is defined accordingly:
def jacc_sim(pair):
dot_product = pair[0].dot(pair[1])
try:
sim = dot_product / (pair[0].numNonzeros() + pair[1].numNonzeros())
except ZeroDivisionError:
return 0.0
return sim
现在,我应该映射功能并收集结果:
Now I should just map the function and collect the results:
distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect()
我在一个只有100个文档的小样本上运行此代码,这两个文件分别是本地计算机和具有180个节点的集群.该任务将永远花费并最终崩溃: https://pastebin.com/UwLUXvUZ
I'm running this code on a small sample with only 100 documents on both, a local machine and a cluster with 180 nodes. The task takes forever and finally crashes: https://pastebin.com/UwLUXvUZ
任何建议可能有什么问题吗?
Any suggestions what might be wrong?
此外,如果距离度量是对称sim(x,y)== sim(y,x),我们只需要矩阵的上三角即可.我发现了一个通过过滤来解决此问题的帖子(
Additionally, if the distance measure is symmetric sim(x,y) == sim(y,x) we just need the upper triangle of the matrix. I found a post that solves this problem by filtering(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`):
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])
但这不适用于SparseVectors列表.
But this doesn't work for the list of SparseVectors.
推荐答案
问题是配置错误,导致我的数据分成了1000个分区.解决方案只是简单地告诉spark他应该创建多少个分区(例如10个):
The problem was a configuration error that led to split up my data into 1000 partitions. The solution was simply to tell spark explicitly how many partitions he should create (e.g. 10):
rdd = sc.parallelize(sparse_vectors, 10)
此外,我通过枚举扩展了稀疏向量的列表,这样我就可以过滤掉不属于上三角矩阵的对:
Moreover I extended the list of sparse vectors with an enumeration, this way I could then filter out pairs which are not part of the upper triangle matrix:
sparse_vectors = [(i, csr_to_sparse_vector(row)) for i, row in enumerate(authors)]
rdd = sc.parallelize(sparse_vectors, 10)
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0][0] < x[1][0])
rdd2.map(lambda x: jacc_sim(x)).filter(lambda x: x is not None).saveAsTextFile('hdfs:///user/username/similarities')
所属相似性函数如下:
def jacc_sim(pair):
id_0 = pair[0][0]
vec_0 = pair[0][1]
id_1 = pair[1][0]
vec_1 = pair[1][1]
dot_product = vec_0.dot(vec_1)
try:
sim = dot_product / (vec_0.numNonzeros() + vec_1.numNonzeros())
if sim > 0:
return (id_0, id_1, sim)
except ZeroDivisionError:
pass
return None
这对我来说非常有效,我希望其他人也会发现它有用!
This worked very well for me and I hope someone else will find it useful as well!
这篇关于pyspark计算稀疏向量的距离矩阵的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!