Spark Python:如何计算RDD中每行之间的Jaccard相似度? [英] Spark Python: How to calculate Jaccard Similarity between each line within an RDD?

查看:443
本文介绍了Spark Python:如何计算RDD中每行之间的Jaccard相似度?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一张约有5万行不同行和2列的表格.您可以将每一行看作是一部电影,而将各列视为该电影的属性-"ID":该电影的ID,"Tags":该电影的某些内容标签,以字符串列表的形式每部电影.

I have a table of around 50k distinct rows, and 2 columns. You can think of each row being a movie, and columns being the attributes of that movie - "ID": id of that movie, "Tags":some content tags of the movie, in form of a list of strings for each movie.

数据看起来像这样:

movie_1,[浪漫",喜剧",英语"]; movie_2,['action','kongfu','Chinese']

movie_1, ['romantic','comedy','English']; movie_2, ['action','kongfu','Chinese']

我的目标是首先根据相应的标签计算每部电影之间的提花相似度,完成后,我将能够为每部电影(例如我选择movie_1)知道其他的与这部电影最相似的前5 个电影(在本例中为movie_1).而且,我不仅要让movie_1本身获得前5名的结果,而且还要使所有电影中都获得前5名的结果.

My goal is to first calculate the jacquard similarity between each Movie based on their corresponding tags, and once that's done, I will be able to know for each movie (for example I choose movie_1), what are the other top 5 most similar movies as with this one (movie_1 in this case). And I want the top 5 results not only for movie_1 itself, but to get the top 5 for all of the movies.

我尝试使用Python解决问题,但是运行时在这里是一个很大的挑战.即使我使用在6个内核上运行的多处理程序,总运行时间仍持续了20多个小时.

I have tried using Python to solve the problem, however the run time is a big challenge here. Even when I used multiprocessing, running on 6 cores, the total run time still lasted over 20 hours.

下面的Python代码:

Python code below:

import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time

col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()

def jaccard_similarity(string1, string2):
    intersection = set(string1).intersection(set(string2))
    union = set(string1).union(set(string2))
    return len(intersection)/float(len(union))

def jc_results(movie_id):
    result=Counter()
    this_index=movie_ids.index(movie_id)
    for another_id in movie_ids:
        that_index=movie_ids.index(another_id)
        if another_id==movie_id:
            continue
        else:
            tag_1=tag_list[this_index]
            tag_2=tag_list[that_index]
            jaccard = jaccard_similarity(tag_1,tag_2)
            result[(movie_id,another_id)]=jaccard
    return result.most_common(10)


from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
    results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
    results[movie_id] = res.get()

然后我想切换到Pyspark,但是我还是很新手,开始使用python,并在写了几行之后陷入了困境,实际上我唯一取得的进步就是使用sc.textFile将数据读取到RDD中...已经阅读了现有的文章,但他们都在使用Scala.如果有人可以在Pyspark上提供帮助或提供任何指导,那将是很棒的.非常感谢!

Then I wanted to switch to Pyspark, however I am still very new to spark python, and got stuck after writing a few lines with it, actually I only progress I have made was reading in the data to RDD using sc.textFile...Have read the existing posts but they are all using Scala..It will be great if anyone can help or provide any guidance with Pyspark. Thanks a lot!

推荐答案

您可以尝试类似于

You could try a solution similar to this stackoverflow answer, though since your data is already tokenized (a list of strings), you wouldn't need to do that step or the ngram step.

我还将提到pyspark中的roximateSimilarityJoin会计算杰卡德距离"而不是杰卡德相似度",但是如果您特别需要,您可以从1中减去以转换回相似度.

I'll also mention that the approxSimilarityJoin in pyspark calculates the Jaccard Distance rather than the Jaccard Similarity, but you can just subtract from 1 to convert back to the Similarity if you need that in particular.

您的代码最终看起来类似于:

Your code would end up looking similar to:

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

db = spark.createDataFrame([
        ('movie_1', ['romantic','comedy','English']),
        ('movie_2', ['action','kongfu','Chinese']),
        ('movie_3', ['romantic', 'action'])
    ], ['movie_id', 'genres'])


model = Pipeline(stages=[
        HashingTF(inputCol="genres", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
    ]).fit(db)

db_hashed = model.transform(db)

db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)

#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).filter('movie_id_A < movie_id_B').show()

具有相应的输出:

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_3|   movie_3|    0.0|
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
|   movie_1|   movie_1|    0.0|
|   movie_2|   movie_2|    0.0|
|   movie_3|   movie_2|   0.75|
|   movie_3|   movie_1|   0.75|
+----------+----------+-------+

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
+----------+----------+-------+

这篇关于Spark Python:如何计算RDD中每行之间的Jaccard相似度?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆