RDD分裂和做新的聚集RDDS [英] RDD split and do aggregation on new RDDs

查看:131
本文介绍了RDD分裂和做新的聚集RDDS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个RDD(字符串,字符串,整数)。

1)欲基于第一两个字符串来减少它

2)然后根据第一个字符串我想组(字符串,int)和对它们进行排序

3)整理后,我需要它们分成小组,每组包含n个元素。

我已经做了下面的code。的问题是在步骤2的元素数量为单个键非常大
和reduceByKey(X + Y)花费了大量的时间。

  //输入
VAL数据=阵列(
  (C1,A1,1),(C1,B1,1),(C2,A1,1),(C1,A2,1),( C1,B2,1),
  (C2,A2,1),(C1,A1,1),(C1,B1,1),(C2,A1,1))VAL RDD = sc.parallelize(数据)
VAL R1 = rdd.map(X =&GT((x._1,x._2),(x._3)))
VAL R2 = r1.reduceByKey((X,Y)=> X + Y).MAP(X =&GT((x._1._1),(x._1._2,x._2)))//这是需要长时间。
VAL R3 = r2.mapValues​​(X => ArrayBuffer(X))。reduceByKey((X,Y)=> X ++ y)的//从列表中我会做分组。
VAL R4 = r3.map(X =>(x._1,x._2.toList.sorted.grouped(2).toList))

问题是C1有很多像B1,B2 ....万元和reduceByKey唯一条目是消磨时间,因为所有的值都将单个节点。
有没有办法实现这个更有效?

  //输出
 阵列((C1,列表(列表((A1,2),(A2,1)),表((B1,2),(b2,1)))),(C2,列表(列表((A1,2 ),(A2,1)))))


解决方案

有至少一个办法,你将数据分组一些问题。第一个问题是由引入

  mapValues​​(X => ArrayBuffer(X))

它创造了大量其中不提供额外的价值,因为你不能利用其可变性的可变对象的后续 reduceByKey

  reduceByKey((X,Y)=> X + Y)

,其中每个 ++ 创建一个新的收集和参数都不可以安全地突变。由于 reduceByKey 地图应用端聚合情况更糟和pretty创造很多GC地狱。


  

有没有办法实现这个更有效?


除非你有哪些可用于定义更聪明的分区最简单的改进是更换数据分布的一些较深的造诣 mapValues​​ + reduceByKey 简单 groupByKey

  VAL R3 = r2.groupByKey

应该还可以使用两个 reduceByKey 电话和 mapPartitions 与<$ C $自定义分区C> preservesPartitioning 而不是地图

 类FirsElementPartitioner(分区:智力)
    扩展org.apache.spark.Partitioner {
  高清numPartitions =分区
  DEF getPartition(重点:任意):INT = {
    key.asInstanceOf [(任何,任何)] ._ 1。##%numPartitions
  }
}VAL R2 = R1
  .reduceByKey(新FirsElementPartitioner(8),(X,Y)=&GT; X + Y)
  .mapPartitions(ITER =&GT; iter.map(X =&GT;((x._1._1),(x._1._2,x._2))),真)//没有在这里需要洗牌。
VAL R3 = r2.groupByKey

它仅需要一个洗牌和 groupByKey 是一个简单的局部操作:

  r3.toDebugString
//(8)MapPartitionsRDD [41]在groupByKey AT&LT;&控制台GT;:37]
// | MapPartitionsRDD [40]在mapPartitions AT&LT;&控制台GT;:35]
// | ShuffledRDD [39]在reduceByKey AT&LT;&控制台GT;:34]
// + - (8)MapPartitionsRDD [1]在地图AT&LT;&控制台GT;:28]
// | ParallelCollectionRDD [0]并行化AT&LT;&控制台GT;:26 []

I have an rdd of (String,String,Int) .

1) I want to reduce it based on the first two strings

2) And Then based on the first String I want to group the (String,Int) and sort them

3) After sorting I need to group them into small groups each containing n elements.

I have done the code below . The problem is the number of elements in the step 2 is very large for a single key and the reduceByKey(x++y) takes a lot of time.

//Input
val data = Array(
  ("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1), 
  ("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y) 

// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList)) 

Problem is the "c1" has lot of unique entries like b1 ,b2....million and reduceByKey is killing time because all the values are going to single node. Is there a way to achieve this more efficiently?

// output
 Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))

解决方案

There at least few problems with a way you group your data. The first problem is introduced by

 mapValues(x => ArrayBuffer(x))

It creates a large amount of mutable objects which provide no additional value since you cannot leverage their mutability in the subsequent reduceByKey

reduceByKey((x, y) => x ++ y) 

where each ++ creates a new collection and neither argument can be safely mutated. Since reduceByKey applies map side aggregation situation is even worse and pretty much creates GC hell.

Is there a way to achieve this more efficiently?

Unless you have some deeper knowledge about data distribution which can be used to define smarter partitioner the simplest improvement is to replace mapValues + reduceByKey with simple groupByKey:

val r3 = r2.groupByKey

It should be also possible to use a custom partitioner for both reduceByKey calls and mapPartitions with preservesPartitioning instead of map.

class FirsElementPartitioner(partitions: Int)
    extends org.apache.spark.Partitioner {
  def numPartitions  = partitions
  def getPartition(key: Any): Int = {
    key.asInstanceOf[(Any, Any)]._1.## % numPartitions
  }
}

val r2 = r1
  .reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
  .mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)

// No shuffle required here.
val r3 = r2.groupByKey

It requires only a single shuffle and groupByKey is simply a local operations:

r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
//  |  MapPartitionsRDD[40] at mapPartitions at <console>:35 []
//  |  ShuffledRDD[39] at reduceByKey at <console>:34 []
//  +-(8) MapPartitionsRDD[1] at map at <console>:28 []
//     |  ParallelCollectionRDD[0] at parallelize at <console>:26 []

这篇关于RDD分裂和做新的聚集RDDS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆