计算pyspark中数据框所有行之间的余弦相似度 [英] Calculating the cosine similarity between all the rows of a dataframe in pyspark

查看:1075
本文介绍了计算pyspark中数据框所有行之间的余弦相似度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据集,其中包含工人及其年龄,性别,地址等人口统计信息以及他们的工作地点。我从数据集中创建了一个RDD,并将其转换为DataFrame。

I have a dataset containing workers with their demographic information like age gender,address etc and their work locations. I created an RDD from the dataset and converted it into a DataFrame.

每个ID都有多个条目。因此,我创建了一个DataFrame,其中仅包含工人的ID和他/她曾工作过的各个办公地点。

There are multiple entries for each ID. Hence, I created a DataFrame which contained only the ID of the worker and the various office locations' that he/she had worked.

    |----------|----------------|
    | **ID**    **Office_Loc**  |
    |----------|----------------|
    |   1      |Delhi, Mumbai,  |
    |          | Gandhinagar    |
    |---------------------------|
    |   2      | Delhi, Mandi   | 
    |---------------------------|
    |   3      |Hyderbad, Jaipur|
    -----------------------------

我想根据他们的办公地点来计算每个工人与其他工人之间的余弦相似度。

I want to calculate the cosine similarity between each worker with every other worker based on their office locations'.

我遍历了DataFrame的各行,从DataFrame检索了一行:

So, I iterated through the rows of the DataFrame, retrieving a single row from the DataFrame :

myIndex = 1
values = (ID_place_df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())

然后使用地图

    cos_weight = ID_place_df.select("ID","office_location").rdd\
  .map(lambda x: get_cosine(values,x[0],x[1]))

计算两个之间的余弦相似度提取行和整个DataFrame。

to calculated the cosine similarity between the extracted row and the whole DataFrame.

我不认为我的方法是一种好方法,因为我要遍历DataFrame的行,这违背了使用方法的全部目的。火花。
在pyspark中有更好的方法吗?
请告知。

I do not think my approach is a good one since I am iterating through the rows of the DataFrame, it defeats the whole purpose of using spark. Is there a better way to do it in pyspark? Kindly advise.

推荐答案

您可以使用 mllib 包,以计算每行TF-IDF的 L2 范数。然后将表与自身相乘以得到两个两个点的乘积相似度的余弦相似度乘以两个 L2 范数:

You can use the mllib package to compute the L2 norm of the TF-IDF of every row. Then multiply the table with itself to get the cosine similarity as the dot product of two by two L2norms:

1。 RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])




  • 计算 TF-IDF

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(","))
    
    from pyspark.mllib.feature import HashingTF, IDF
    hashingTF = HashingTF()
    tf = hashingTF.transform(documents)
    


  • 您可以在 HashingTF 中指定特征数量,以使特征矩阵更小(较少的列)。

    You can specify the number of features in HashingTF to make the feature matrix smaller (fewer columns).

        tf.cache()
        idf = IDF().fit(tf)
        tfidf = idf.transform(tf)
    




    • 计算 L2 范本:

      from pyspark.mllib.feature import Normalizer
      labels = rdd.map(lambda l: l[0])
      features = tfidf
      
      normalizer = Normalizer()
      data = labels.zip(normalizer.transform(features))
      


    • 通过将矩阵与其自身相乘来计算余弦相似度:

    • Compute cosine similarity by multiplying the matrix with itself:

      from pyspark.mllib.linalg.distributed import IndexedRowMatrix
      mat = IndexedRowMatrix(data).toBlockMatrix()
      dot = mat.multiply(mat.transpose())
      dot.toLocalMatrix().toArray()
      
          array([[ 0.        ,  0.        ,  0.        ,  0.        ],
                 [ 0.        ,  1.        ,  0.10794634,  0.        ],
                 [ 0.        ,  0.10794634,  1.        ,  0.        ],
                 [ 0.        ,  0.        ,  0.        ,  1.        ]])
      

      或:使用笛卡尔积和函数<$ c numpy数组上的$ c> dot :

      OR: Using a Cartesian product and the function dot on numpy arrays:

      data.cartesian(data)\
          .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
          .sortByKey()\
          .collect()
      
          [((1, 1), 1.0),
           ((1, 2), 0.10794633570596117),
           ((1, 3), 0.0),
           ((2, 1), 0.10794633570596117),
           ((2, 2), 1.0),
           ((2, 3), 0.0),
           ((3, 1), 0.0),
           ((3, 2), 0.0),
           ((3, 3), 1.0)]
      


    • 2。 DataFrame

      由于您似乎正在使用数据帧,因此可以使用 spark ml 软件包:

      Since you seem to be using dataframes, you can use the spark mlpackage instead:

      import pyspark.sql.functions as psf
      df = rdd.toDF(["ID", "Office_Loc"])\
          .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
      




      • 计算TF-IDF:

        • Compute TF-IDF:

          from pyspark.ml.feature import HashingTF, IDF
          hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf")
          tf = hashingTF.transform(df)
          
          idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
          tfidf = idf.transform(tf)
          


        • 计算 L2 范数:

          from pyspark.ml.feature import Normalizer
          normalizer = Normalizer(inputCol="feature", outputCol="norm")
          data = normalizer.transform(tfidf)
          


        • 计算矩阵乘积:

        • Compute matrix product:

          from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
          mat = IndexedRowMatrix(
              data.select("ID", "norm")\
                  .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
          dot = mat.multiply(mat.transpose())
          dot.toLocalMatrix().toArray()
          

          OR:,使用联接和 UDF dot 的code>:

          OR: using a join and a UDF for function dot:

          dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
          data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
              .select(
                  psf.col("i.ID").alias("i"), 
                  psf.col("j.ID").alias("j"), 
                  dot_udf("i.norm", "j.norm").alias("dot"))\
              .sort("i", "j")\
              .show()
          
              +---+---+-------------------+
              |  i|  j|                dot|
              +---+---+-------------------+
              |  1|  2|0.10794633570596117|
              |  1|  3|                0.0|
              |  2|  3|                0.0|
              +---+---+-------------------+
          


        • 本教程列出了用于乘以大型矩阵的不同方法: https://labs.yodas.com/large-用pyspark缩放矩阵乘法或如何匹配公司的两个大数据集-1be4b1b2871e

          This tutorial lists different methods to multiply large scale matrices: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e

          这篇关于计算pyspark中数据框所有行之间的余弦相似度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆