reduceByKey与groupByKey,aggregateByKey与CombineByKey之间的火花差异 [英] Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey

查看:211
本文介绍了reduceByKey与groupByKey,aggregateByKey与CombineByKey之间的火花差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人可以解释reducebykey,groupbykey,aggregatebykey和Combinebykey之间的区别吗?我已经阅读了有关此文档,但无法理解确切的区别?

Can anyone explain the difference between reducebykey,groupbykey,aggregatebykey and combinebykey? I have read the documents regarding this , but couldn't understand the exact differences?

如果您可以通过示例进行解释,那就太好了.

If you can explain it with examples it would be great.

推荐答案

groupByKey:

语法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" ") )
                    .map(word => (word,1))
                    .groupByKey()
                    .map((x,y) => (x,sum(y)))

groupByKey可能会导致磁盘空间不足的问题,因为数据是通过网络发送并在reduce worker上收集的.

groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers.

reduceByKey:

语法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" "))
                    .map(word => (word,1))
                    .reduceByKey((x,y)=> (x+y))

数据在每个分区组合,每个分区的一个键只有一个输出要通过网络发送. reduceByKey需要将您所有的值组合成另一个具有完全相同类型的值.

Data are combined at each partition, only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type.

aggregateByKey:

与reduceByKey相同,它具有一个初始值.

same as reduceByKey, which takes an initial value.

3个参数作为输入 一世.初始值 ii.组合器逻辑 iii.顺序运算逻辑

3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic

示例:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
    val data = sc.parallelize(keysWithValuesList)
    //Create key value pairs
    val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
    val initialCount = 0;
    val addToCounts = (n: Int, v: String) => n + 1
    val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
    val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

输出: 按关键和结果汇总 酒吧-> 3 foo-> 5

ouput: Aggregate By Key sum Results bar -> 3 foo -> 5

combineByKey:

3个参数作为输入

  1. 初始值:与gregationByKey不同,它不需要始终传递常量,我们可以传递一个将返回新值的函数.
  2. 合并功能
  3. 组合功能

示例:

val result = rdd.combineByKey(
                        (v) => (v,1),
                        ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
                        ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2)) 
                        ).map( { case (k,v) => (k,v._1/v._2.toDouble) })
        result.collect.foreach(println)

reduceByKey,aggregateByKey,combineByKey 优于 groupByKey

reduceByKey,aggregateByKey,combineByKey preferred over groupByKey

参考: 避免使用groupByKey

这篇关于reduceByKey与groupByKey,aggregateByKey与CombineByKey之间的火花差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆