reduceByKey与groupByKey,aggregateByKey与CombineByKey之间的火花差异 [英] Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey
问题描述
有人可以解释reducebykey,groupbykey,aggregatebykey和Combinebykey之间的区别吗?我已经阅读了有关此文档,但无法理解确切的区别?
Can anyone explain the difference between reducebykey,groupbykey,aggregatebykey and combinebykey? I have read the documents regarding this , but couldn't understand the exact differences?
如果您可以通过示例进行解释,那就太好了.
If you can explain it with examples it would be great.
推荐答案
groupByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey可能会导致磁盘空间不足的问题,因为数据是通过网络发送并在reduce worker上收集的.
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers.
reduceByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
数据在每个分区组合,每个分区的一个键只有一个输出要通过网络发送. reduceByKey需要将您所有的值组合成另一个具有完全相同类型的值.
Data are combined at each partition, only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type.
aggregateByKey:
与reduceByKey相同,它具有一个初始值.
same as reduceByKey, which takes an initial value.
3个参数作为输入 一世.初始值 ii.组合器逻辑 iii.顺序运算逻辑
3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic
示例:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出: 按关键和结果汇总 酒吧-> 3 foo-> 5
ouput: Aggregate By Key sum Results bar -> 3 foo -> 5
combineByKey:
3个参数作为输入
- 初始值:与gregationByKey不同,它不需要始终传递常量,我们可以传递一个将返回新值的函数.
- 合并功能
- 组合功能
示例:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey,aggregateByKey,combineByKey 优于 groupByKey
reduceByKey,aggregateByKey,combineByKey preferred over groupByKey
参考: 避免使用groupByKey
这篇关于reduceByKey与groupByKey,aggregateByKey与CombineByKey之间的火花差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!