pyspark计算稀疏向量的距离矩阵 [英] pyspark calculate distance matrix of sparse vectors

查看:115
本文介绍了pyspark计算稀疏向量的距离矩阵的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建一种通用方法来计算许多稀疏向量(长度为250k的100k个向量)的距离矩阵.在我的示例中,数据以Scipy CSR矩阵表示.这就是我正在做的:

I'm trying to build a generic way to calculate a distance matrix of many sparse vectors (100k vectors with a length of 250k). In my example the data is represented in a scipy csr matrix. This is what I'm doing:

首先,我要定义一种将csr行转换为pyspark SparseVectors的方法:

First I'm defining a method to transform the csr rows to pyspark SparseVectors:

def csr_to_sparse_vector(row):
    return SparseVector(row.shape[1], sorted(row.indices), row.data)

现在,我将这些行转换为向量,并将其保存到列表中,然后将其提供给SparkContext:

Now I transform the rows into vectors and save them to a list which I then feed to the SparkContext:

sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample]
rdd = sc.parallelize(sparse_vectors)

在下一步中,我将使用笛卡尔函数构建所有对(类似于此帖子:

In the next step I use the cartesian function to build all the pairs (similar to this post: Pyspark calculate custom distance between all vectors in a RDD)

在本实验中,我想使用相应定义的tje Jaccard相似度:

In this experiment I want to use tje Jaccard Similarity which is defined accordingly:

def jacc_sim(pair):
    dot_product = pair[0].dot(pair[1])
    try:
        sim = dot_product / (pair[0].numNonzeros() + pair[1].numNonzeros())
    except ZeroDivisionError:
        return 0.0
    return sim

现在,我应该映射功能并收集结果:

Now I should just map the function and collect the results:

distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect()

我在一个只有100个文档的小样本上运行此代码,这两个文件分别是本地计算机和具有180个节点的集群.该任务将永远花费并最终崩溃: https://pastebin.com/UwLUXvUZ

I'm running this code on a small sample with only 100 documents on both, a local machine and a cluster with 180 nodes. The task takes forever and finally crashes: https://pastebin.com/UwLUXvUZ

任何建议可能有什么问题吗?

Any suggestions what might be wrong?

此外,如果距离度量是对称sim(x,y)== sim(y,x),我们只需要矩阵的上三角即可.我发现了一个通过过滤来解决此问题的帖子(

Additionally, if the distance measure is symmetric sim(x,y) == sim(y,x) we just need the upper triangle of the matrix. I found a post that solves this problem by filtering(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`):

rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])

但这不适用于SparseVectors列表.

But this doesn't work for the list of SparseVectors.

推荐答案

问题是配置错误,导致我的数据分成了1000个分区.解决方案只是简单地告诉spark他应该创建多少个分区(例如10个):

The problem was a configuration error that led to split up my data into 1000 partitions. The solution was simply to tell spark explicitly how many partitions he should create (e.g. 10):

rdd = sc.parallelize(sparse_vectors, 10)

此外,我通过枚举扩展了稀疏向量的列表,这样我就可以过滤掉不属于上三角矩阵的对:

Moreover I extended the list of sparse vectors with an enumeration, this way I could then filter out pairs which are not part of the upper triangle matrix:

sparse_vectors = [(i, csr_to_sparse_vector(row)) for i, row in enumerate(authors)]
rdd = sc.parallelize(sparse_vectors, 10)
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0][0] < x[1][0])
rdd2.map(lambda x: jacc_sim(x)).filter(lambda x: x is not None).saveAsTextFile('hdfs:///user/username/similarities')

所属相似性函数如下:

def jacc_sim(pair):
    id_0 = pair[0][0]
    vec_0 = pair[0][1]
    id_1 = pair[1][0]
    vec_1 = pair[1][1]
    dot_product = vec_0.dot(vec_1)
    try:
        sim = dot_product / (vec_0.numNonzeros() + vec_1.numNonzeros())
        if sim > 0:
            return (id_0, id_1, sim)
    except ZeroDivisionError:
        pass
    return None

这对我来说非常有效,我希望其他人也会发现它有用!

This worked very well for me and I hope someone else will find it useful as well!

这篇关于pyspark计算稀疏向量的距离矩阵的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆