在没有笛卡尔的情况下,借助Spark和Scala的RDD的Jaccard相似性? [英] Jaccard Similarity of an RDD with the help of Spark and Scala without Cartesian?

查看:401
本文介绍了在没有笛卡尔的情况下,借助Spark和Scala的RDD的Jaccard相似性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究配对RDD.我的目的是计算jaccard相似度 在rdd值的集合之间,并根据jaccard相似性阈值对其进行聚类.我的RDD的结构为:

I am working on pair RDDs. My aim is to calculate jaccard similarity between the set of rdd values and cluster them according to the jaccard similarity threshold value.Structure of my RDD is :

val a= [Key,Set(String)]   //Pair RDD

For example:-    
India,[Country,Place,....]  
USA,[Country,State,..]  
Berlin,[City,Popluatedplace,..]   

找到jaccard相似性后,我将相似实体聚类到一个聚类中.在上面的示例中,印度和美国将基于某个阈值分为一个集群,而柏林将位于另一个集群.

After finding jaccard similarity, I will cluster the similar entities into one cluster. In the above example, India and USA will be cluster into one cluster based on some threshold value whereas Berlin will be in the other cluster.

所以我采用了rdd a的笛卡尔积

So I took the Cartesian product of rdd a

val filterOnjoin = a.cartesian(a).filter(f => 
(!f._1._1.toString().contentEquals(f._2._1.toString()))) 
//Cartesianproduct of rdd a and filtering rows with same key at both 
//the position.
//e.g. ((India,Set[Country,Place,....]),(USA,Set[Country,State,..])) 

并借助jaccard相似度比较值集.

and compare the set of values with the help of jaccard similarity.

val Jsim = filterOnjoin.map(f => (f._1._1, (f._2._1, 
Similarity.sim(f._1._2, f._2._2)))) //calculating jaccard similarity.
//(India,USA,0.8)

代码在较小的数据集上运行良好.随着数据集大小的增加,笛卡尔乘积花费了太多时间.对于100 MB数据(rdd"a"的大小),其正在执行的数据混洗读取约为25 GB.对于3.5 GB的数据,以TB为单位.

The code is running fine on smaller dataset. As the size of dataset is increased, Cartesian product is taking too much time. For 100 MB data(size of rdd "a"), its doing data shuffle read around 25 GB. For 3.5 GB data, its in TB.

我经历了各种链接.像火花调整方法和一些堆栈溢出方法.但是大部分文章都是广播较小的RDD的.但是,这两个rdd的大小相同且很大.

I have gone through various links. Like spark tuning methods and some on stack overflow. But most of the post it is written that broadcast the smaller RDD. But here the size of both the rdd is the same and its big.

我关注的链接:-
火花:从RDD [X]产生所有可能组合的RDD [(X,X)] 来自rddx的所有可能组合

Links which I followed :-
Spark: produce RDD[(X, X)] of all possible combinations from RDD[X] of-all-possible-combinations-from-rddx

Spark分区速度很慢,并且整理了太多的数据

Map关键字,值对基于它们在Spark中的值相似性

我是Spark和Scala的新手.我无法想到超出瓶颈的笛卡尔积.如果没有笛卡尔积,是否可以解决此问题.

I am new to Spark and Scala. I am unable to think beyond Cartesian product which is bottleneck here. Is it possible to solve this problem without Cartesian product.

推荐答案

由于笛卡尔乘积是对rdd的昂贵操作,我试图通过使用Spark MLib中存在的HashingTF和MinHashLSH库来解决jaccard相似性来解决上述问题.在问题中提到的rdd"a"中找到Jaccard相似性的步骤:

As Cartesian product is an expensive operation on rdd, I tried to solve above problem by using HashingTF and MinHashLSH library present in Spark MLib for finding jaccard similarity. Steps to find Jaccard similarity in rdd "a" mentioned in the question:

  • 将rdd转换为数据帧

  • Convert the rdd into dataframe

 import sparkSession.implicits._  
 val dfA = a.toDF("id", "values")

  • 借助HashingTF创建特征向量

  • Create the feature vector with the help of HashingTF

      val hashingTF = new HashingTF()
     .setInputCol("values").setOutputCol("features").setNumFeatures(1048576)  
    

  • 功能转换

  • Feature transformation

    val featurizedData = hashingTF.transform(dfA) //Feature Transformation  
    

  • 创建minHash表.表数越多,数值越准确 结果是,但是通信成本和运行时间很高.

  • Creating minHash table. More is the value of number of table, more accurate results will be, but high communication cost and run time.

     val mh = new MinHashLSH()
            .setNumHashTables(3) 
            .setInputCol("features")
            .setOutputCol("hashes")
    

  • 近似相似性联接采用两个数据集,并近似返回数据集中距离小于用户定义的阈值的行对.近似相似连接支持同时连接两个不同的数据集和自我连接.自连接会产生一些重复的对.

  • Approximate similarity join takes two datasets and approximately returns pairs of rows in the datasets whose distance is smaller than a user-defined threshold. Approximate similarity join supports both joining two different datasets and self-joining. Self-joining will produce some duplicate pairs.

      val model = mh.fit(featurizedData)  
      //Approximately joining featurizedData with Jaccard distance smaller 
      //than 0.45
     val dffilter = model.approxSimilarityJoin(featurizedData, featurizedData, 
                    0.45)    
    

  • 由于存在火花,我们必须在代码中进行手动优化,例如设置分区数,设置持久性级别等.我还配置了这些参数.

    Since in spark, we have to do manual optimization in our code like setting of number of partition, setting persist level etc. I have configured these parameters also.

    • 将storaagelevel从persist()更改为persist(StorageLevel.MEMORY_AND_DISK), 它可以帮助我消除OOM错误.
    • 在执行联接操作时,还根据rdd对数据进行了重新分区 尺寸.在16.6 GB数据集上,在执行简单的联接操作时,我使用的是200 分割.将其增加到600,也解决了我与OOM有关的问题.
    • Changing storaagelevel from persist() to persist(StorageLevel.MEMORY_AND_DISK), it help me to remove OOM error.
    • Also while doing join operation, re-partitioned the data according to the rdd size. On 16.6 GB data set, while doing simple join operation, I was using 200 partition. On increase it to 600, it also solves my problem related to OOM.

    PS:在对16.6数据集进行实验时,配置了常数参数s​​etNumFeatures(1048576)和setNumHashTables(3).您可以根据数据集增加或减少这些值.另外,分区数取决于您的数据集大小.通过这些优化,我得到了想要的结果.

    PS: the constant parameters setNumFeatures(1048576) and setNumHashTables(3) are configured while experimenting on 16.6 data set. You can increase or decrease these value according to your data set. Also the number of partition depends upon your data set size. With these optimization, I got my desired results.

    有用的链接:-
    [ https://spark.apache. org/docs/2.2.0/ml-features.html#locality-sensitive-hashing]
    [ https://eng.uber.com/lsh/]
    [ https://data-flair.training/blogs/limitations-of -apache-spark/]

    Useful links:-
    [https://spark.apache.org/docs/2.2.0/ml-features.html#locality-sensitive-hashing]
    [https://eng.uber.com/lsh/]
    [https://data-flair.training/blogs/limitations-of-apache-spark/]

    这篇关于在没有笛卡尔的情况下,借助Spark和Scala的RDD的Jaccard相似性?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆