是否有可能把两个RDDS的价值观,以避免昂贵的洗牌? [英] Is it possible to join two rdds' values to avoid expensive shuffling?

查看:163
本文介绍了是否有可能把两个RDDS的价值观,以避免昂贵的洗牌?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个RDDS均具有两列(K,V)。在源密钥出现之一的另一个和用于不同的和独特的值被分配给键的每一行的那些RDDS。该文本文件创建RDDS在这个岗位的底部中给出。

键在两种RDDS完全不同的,我想加入基于其值的两个RDDS并试图找到多少每一对存在共同的价值观。例如我想达到的结果,如1-5(10),这意味着从RDD1集的1的重要价值和RDD25键值共同分享10个值。

我在一台机器有256 GB的RAM和72内核上工作。一个文本文件是500 MB,而另一种是3 MB。

下面是我的code:

  VAL的conf =新SparkConf()。setAppName(应用程序)。setMaster(本地[*])。SET(spark.shuffle.spill,真 )
.SET(spark.shuffle.memoryFraction,0.4)
.SET(spark.executor.memory,128克)
.SET(spark.driver.maxResultSize,0)VAL RDD1集= sc.textFile(\\\\ t1.txt,1000){.MAP行=> VAL S = line.split(\\ t的); (S(0),S(1))}VAL RDD2 = sc.textFile(\\\\ t2.txt,1000){.MAP行=> VAL S = line.split(\\ t的); (S(1)中,s(0))}
VAL emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)        VAL加入= emp.mapPartitions(ITER =>作为{
          (K,V1)LT; - ITER
          V2< - emp_newBC.value.getOrElse(V1,可迭代())
        }收益率(S$ K-V2 $,1))    joined.foreach(的println)VAL结果= joined.reduceByKey((A,B)=>一种+ b)的

我尝试使用从脚本看到广播变量来管理这个问题。如果我本身加入RDD2(有25万行)对在同一分区出现,因此较少洗牌发生,因此需要3分钟即可获得结果。然而,应用RDD1集与RDD2时,对分散导致通过非常昂贵的洗牌过程中的分区,它最终总是给人

错误TaskSchedulerImpl:localhost上失落的执行人司机:执行人心跳后168591毫秒误差超时。

基于我的结果


  • 我应该尝试分割文本文件中较小的块创建RDD1集
    并分别RDD2加入那些更小的块?


  • 有根据其值字段连接两个RDDS的另一种方式?如果我描述的原始值作为密钥,并使用join函数值对再次散布在导致再次非常昂贵的reducebykey操作的分区加入其中。例如。

      VAL RDD1集= sc.textFile(\\\\ t1.txt,1000){.MAP行=> VAL S = line.split(\\ t的); (S(1)中,s(0))}    VAL RDD2 = sc.textFile(\\\\ t2.txt,1000){.MAP行=> VAL S = line.split(\\ t的); (S(1)中,s(0))}

    RDD1.join(RDD2).MAP(行=>(line._2,1))。reduceByKey((A,B)=>(A + B))


伪数据样本:

  KEY VALUE
1 13894
1 17376
1 15688
1 22434
1 2282
1 14970
1 11549
1 26027
1 2895
1 15052
1 20815
2 9782
2 3393
2 11783
2 22737
2 12102
2 10947
2 24343
2 28620
2 2486
2 249
2 3271
2 30963
2 30532
2 2895
2 13894
2 874
2 2021
3 6720
3 3402
3 25894
3 1290
3 21395
3 21137
3 18739
...

一个小例子

RDD1集

  2 1
2 2
2 3
2 4
2 5
2 6
3 1
3 6
3 7
3 8
3 9
4 3
4 4
4 5
4 6

RDD2

  21 1
21 2
21 5
21 11
21 12
21 10
22 7
22 8
22 13
22 9
22 11

根据这些数据JOIN结果:

 (3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(3-22,1)
(4-21,1)
(2-21,1)
(3-21,1)
(3-22,1)
(3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(4-21,1)
(2-21,1)
(3-21,1)

REDUCEBYKEY结果:

 (4-21,1)
(3-21,1)
(2-21,3)
(3-22,3)


解决方案

你有没有看使用笛卡尔加入?也许你可以尝试像下面的内容:

  VAL RDD1集= sc.parallelize(为{X<  -  1至3; Y<  -  1到5}收益率(X,Y))//样本RDD
VAL RDD2 = sc.parallelize(为{X&下; - 1至3; Y&下; - 3至7}产量(X,Y))//样品RDD与来自第一略微偏移值VAL G1 = rdd1.groupByKey()
VAL G2 = rdd2.groupByKey()VAL车= g1.cartesian(G2).MAP {案((KEY1,值1),(KEY2,values​​2))=>
             ((KEY1,KEY2),(values​​1.toSet&安培; values​​2.toSet).size)
           }

当我尝试运行在群集上面的例子中,我看到以下内容:

 斯卡拉> rdd1.take(5).foreach(的println)
...
(1,1)
(1,2)
(1,3)
(1,4)
(1,5)
斯卡拉> rdd2.take(5).foreach(的println)
...
(1,3)
(1,4)
(1,5)
(1,6)
(1,7)
斯卡拉> cart.take(5).foreach(的println)
...
((1,1),3)
((1,2),3)
((1,3),3)
((2,1),3)
((2,2),3)

结果表明,对于(KEY1,KEY2),有套3之间的匹配元素。请注意,结果总是3在这里,因为初始化输入元组'范围由3个要素重叠。

笛卡尔转换不会引起洗牌要么因为它只是在每个RDD的元件进行迭代,并且产生一个笛卡尔乘积。您可以通过调用一个例子 toDebugString()函数看到这一点。

 斯卡拉> VAL车= rdd1.cartesian(RDD2)
车:org.apache.spark.rdd.RDD [((INT,INT),(INT,INT))] = CartesianRDD [9]在AT&LT笛卡尔;控制台>:25斯卡拉> carts.toDebugString
res11:字符串=
(64)CartesianRDD [9]在AT&LT笛卡尔;控制台>:25]
 | ParallelCollectionRDD [1]在并行化AT<&控制台GT;:21 []
 | ParallelCollectionRDD [2]在并行化在&下;控制台>:21 []

I have two RDDs both having two columns as (K,V). In the sources for those RDDs keys are appearing one under the other and for each row a different and distinct value is assigned to the keys. The text files to create RDDs are given at the bottom of this post.

Keys are totally different in both RDDs and I would like to join two RDDs based on their values and try to find how many common values exist for each pair. e.g. I am trying to reach a result such as (1-5, 10) meaning that a key value of "1" from RDD1 and a key value of "5" from RDD2 share 10 values in common.

I work on a single machine with 256 GB ram and 72 cores. One text file is 500 MB while the other is 3 MB.

Here is my code:

val conf = new SparkConf().setAppName("app").setMaster("local[*]").set("spark.shuffle.spill", "true")
.set("spark.shuffle.memoryFraction", "0.4")
.set("spark.executor.memory","128g")
.set("spark.driver.maxResultSize", "0")

val RDD1 = sc.textFile("\\t1.txt",1000).map{line => val s = line.split("\t"); (s(0),s(1))}

val RDD2 = sc.textFile("\\t2.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))}


val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)

        val joined = emp.mapPartitions(iter => for {
          (k, v1) <- iter
          v2 <- emp_newBC.value.getOrElse(v1, Iterable())
        } yield (s"$k-$v2", 1))

    joined.foreach(println)

val result = joined.reduceByKey((a,b) => a+b)

I try to manage this issue by using a broadcast variable as seen from the script. If I join RDD2 (having 250K rows) with itself pairs show up in the same partitions and so less shuffle takes place so it takes 3 minutes to get the results. However, when applying RDD1 vs. RDD2 the pairs are scattered through partitions resulting in very expensive shuffling procedure and it always ends up giving

ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 168591 ms error.

Based on my results:

  • Should I try to partition text file to create RDD1 in smaller chunks and join those smaller chunks separately with RDD2?

  • Is there another way of joining two RDDs based on their Value fields? If I describe the original values as keys and join them with the join function the value pairs are again scattered over the partitions which results in again a very expensive reducebykey operation. e.g.

    val RDD1 = sc.textFile("\\t1.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))}
    
        val RDD2 = sc.textFile("\\t2.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))}
    

    RDD1.join(RDD2).map(line => (line._2,1)).reduceByKey((a,b) => (a+b))

PSEUDO DATA SAMPLE:

KEY VALUE
1   13894
1   17376
1   15688
1   22434
1   2282
1   14970
1   11549
1   26027
1   2895
1   15052
1   20815
2   9782
2   3393
2   11783
2   22737
2   12102
2   10947
2   24343
2   28620
2   2486
2   249
2   3271
2   30963
2   30532
2   2895
2   13894
2   874
2   2021
3   6720
3   3402
3   25894
3   1290
3   21395
3   21137
3   18739
...

A SMALL EXAMPLE

RDD1

2   1
2   2
2   3
2   4
2   5
2   6
3   1
3   6
3   7
3   8
3   9
4   3
4   4
4   5
4   6

RDD2

21  1
21  2
21  5
21  11
21  12
21  10
22  7
22  8
22  13
22  9
22  11

BASED ON THIS DATA JOIN RESULTS:

(3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(3-22,1)
(4-21,1)
(2-21,1)
(3-21,1)
(3-22,1)
(3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(4-21,1)
(2-21,1)
(3-21,1)

REDUCEBYKEY RESULTS:

(4-21,1)
(3-21,1)
(2-21,3)
(3-22,3)

解决方案

Have you looked at using a cartesian join? You could maybe try something like below:

val rdd1 = sc.parallelize(for { x <- 1 to 3; y <- 1 to 5 } yield (x, y)) // sample RDD
val rdd2 = sc.parallelize(for { x <- 1 to 3; y <- 3 to 7 } yield (x, y)) // sample RDD with slightly displaced values from the first

val g1 = rdd1.groupByKey()
val g2 = rdd2.groupByKey()

val cart = g1.cartesian(g2).map { case ((key1, values1), (key2, values2)) => 
             ((key1, key2), (values1.toSet & values2.toSet).size) 
           }

When I try running the above example in a cluster, I see the following:

scala> rdd1.take(5).foreach(println)
...
(1,1)
(1,2)
(1,3)
(1,4)
(1,5)
scala> rdd2.take(5).foreach(println)
...
(1,3)
(1,4)
(1,5)
(1,6)
(1,7)
scala> cart.take(5).foreach(println)
...
((1,1),3)
((1,2),3)
((1,3),3)
((2,1),3)
((2,2),3)

The result indicates that for (key1, key2), there are 3 matching elements between the sets. Note that the result is always 3 here since the initialized input tuples' ranges overlapped by 3 elements.

The cartesian transformation does not cause a shuffle either since it just iterates over the elements of each RDD and produces a cartesian product. You can see this by calling the toDebugString() function on an example.

scala> val carts = rdd1.cartesian(rdd2)
carts: org.apache.spark.rdd.RDD[((Int, Int), (Int, Int))] = CartesianRDD[9] at cartesian at <console>:25

scala> carts.toDebugString
res11: String =
(64) CartesianRDD[9] at cartesian at <console>:25 []
 |   ParallelCollectionRDD[1] at parallelize at <console>:21 []
 |   ParallelCollectionRDD[2] at parallelize at <console>:21 []

这篇关于是否有可能把两个RDDS的价值观,以避免昂贵的洗牌?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆