RDD分裂和做新的聚集RDDS [英] RDD split and do aggregation on new RDDs
问题描述
我有一个RDD(字符串,字符串,整数)。
1)欲基于第一两个字符串来减少它
2)然后根据第一个字符串我想组(字符串,int)和对它们进行排序
3)整理后,我需要它们分成小组,每组包含n个元素。
我已经做了下面的code。的问题是在步骤2的元素数量为单个键非常大
和reduceByKey(X + Y)花费了大量的时间。
//输入
VAL数据=阵列(
(C1,A1,1),(C1,B1,1),(C2,A1,1),(C1,A2,1),( C1,B2,1),
(C2,A2,1),(C1,A1,1),(C1,B1,1),(C2,A1,1))VAL RDD = sc.parallelize(数据)
VAL R1 = rdd.map(X =&GT((x._1,x._2),(x._3)))
VAL R2 = r1.reduceByKey((X,Y)=> X + Y).MAP(X =&GT((x._1._1),(x._1._2,x._2)))//这是需要长时间。
VAL R3 = r2.mapValues(X => ArrayBuffer(X))。reduceByKey((X,Y)=> X ++ y)的//从列表中我会做分组。
VAL R4 = r3.map(X =>(x._1,x._2.toList.sorted.grouped(2).toList))
问题是C1有很多像B1,B2 ....万元和reduceByKey唯一条目是消磨时间,因为所有的值都将单个节点。
有没有办法实现这个更有效?
//输出
阵列((C1,列表(列表((A1,2),(A2,1)),表((B1,2),(b2,1)))),(C2,列表(列表((A1,2 ),(A2,1)))))
有至少一个办法,你将数据分组一些问题。第一个问题是由引入
mapValues(X => ArrayBuffer(X))
它创造了大量其中不提供额外的价值,因为你不能利用其可变性的可变对象的后续 reduceByKey
reduceByKey((X,Y)=> X + Y)
,其中每个 ++
创建一个新的收集和参数都不可以安全地突变。由于 reduceByKey
地图应用端聚合情况更糟和pretty创造很多GC地狱。
有没有办法实现这个更有效?
块引用>除非你有哪些可用于定义更聪明的分区最简单的改进是更换数据分布的一些较深的造诣
mapValues
+reduceByKey
简单groupByKey
:VAL R3 = r2.groupByKey
应该还可以使用两个
reduceByKey
电话和mapPartitions
与<$ C $自定义分区C> preservesPartitioning 而不是地图
。类FirsElementPartitioner(分区:智力)
扩展org.apache.spark.Partitioner {
高清numPartitions =分区
DEF getPartition(重点:任意):INT = {
key.asInstanceOf [(任何,任何)] ._ 1。##%numPartitions
}
}VAL R2 = R1
.reduceByKey(新FirsElementPartitioner(8),(X,Y)=&GT; X + Y)
.mapPartitions(ITER =&GT; iter.map(X =&GT;((x._1._1),(x._1._2,x._2))),真)//没有在这里需要洗牌。
VAL R3 = r2.groupByKey它仅需要一个洗牌和
groupByKey
是一个简单的局部操作:r3.toDebugString
//(8)MapPartitionsRDD [41]在groupByKey AT&LT;&控制台GT;:37]
// | MapPartitionsRDD [40]在mapPartitions AT&LT;&控制台GT;:35]
// | ShuffledRDD [39]在reduceByKey AT&LT;&控制台GT;:34]
// + - (8)MapPartitionsRDD [1]在地图AT&LT;&控制台GT;:28]
// | ParallelCollectionRDD [0]并行化AT&LT;&控制台GT;:26 []I have an rdd of (String,String,Int) .
1) I want to reduce it based on the first two strings
2) And Then based on the first String I want to group the (String,Int) and sort them
3) After sorting I need to group them into small groups each containing n elements.
I have done the code below . The problem is the number of elements in the step 2 is very large for a single key and the reduceByKey(x++y) takes a lot of time.
//Input val data = Array( ("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1), ("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1)) val rdd = sc.parallelize(data) val r1 = rdd.map(x => ((x._1, x._2), (x._3))) val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2))) // This is taking long time. val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y) // from the list I will be doing grouping. val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
Problem is the "c1" has lot of unique entries like b1 ,b2....million and reduceByKey is killing time because all the values are going to single node. Is there a way to achieve this more efficiently?
// output Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))
解决方案There at least few problems with a way you group your data. The first problem is introduced by
mapValues(x => ArrayBuffer(x))
It creates a large amount of mutable objects which provide no additional value since you cannot leverage their mutability in the subsequent
reduceByKey
reduceByKey((x, y) => x ++ y)
where each
++
creates a new collection and neither argument can be safely mutated. SincereduceByKey
applies map side aggregation situation is even worse and pretty much creates GC hell.Is there a way to achieve this more efficiently?
Unless you have some deeper knowledge about data distribution which can be used to define smarter partitioner the simplest improvement is to replace
mapValues
+reduceByKey
with simplegroupByKey
:val r3 = r2.groupByKey
It should be also possible to use a custom partitioner for both
reduceByKey
calls andmapPartitions
withpreservesPartitioning
instead ofmap
.class FirsElementPartitioner(partitions: Int) extends org.apache.spark.Partitioner { def numPartitions = partitions def getPartition(key: Any): Int = { key.asInstanceOf[(Any, Any)]._1.## % numPartitions } } val r2 = r1 .reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y) .mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true) // No shuffle required here. val r3 = r2.groupByKey
It requires only a single shuffle and
groupByKey
is simply a local operations:r3.toDebugString // (8) MapPartitionsRDD[41] at groupByKey at <console>:37 [] // | MapPartitionsRDD[40] at mapPartitions at <console>:35 [] // | ShuffledRDD[39] at reduceByKey at <console>:34 [] // +-(8) MapPartitionsRDD[1] at map at <console>:28 [] // | ParallelCollectionRDD[0] at parallelize at <console>:26 []
这篇关于RDD分裂和做新的聚集RDDS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!