reduceByKey、groupByKey、aggregateByKey 和 combineByKey 之间的 Spark 差异 [英] Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey

查看:24
本文介绍了reduceByKey、groupByKey、aggregateByKey 和 combineByKey 之间的 Spark 差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

谁能解释一下reducebykeygroupbykeyaggregatebykeycombinebykey 之间的区别?我已阅读有关此的文档,但无法理解确切的区别.

Can anyone explain the difference between reducebykey, groupbykey, aggregatebykey and combinebykey? I have read the documents regarding this, but couldn't understand the exact differences.

有例子的解释会很棒.

推荐答案

groupByKey:

语法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" ") )
                    .map(word => (word,1))
                    .groupByKey()
                    .map((x,y) => (x,sum(y)))
            

groupByKey 可能会导致磁盘不足问题,因为数据通过网络发送并在减少的工作人员上收集.

groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers.

reduceByKey:

语法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" "))
                    .map(word => (word,1))
                    .reduceByKey((x,y)=> (x+y))

数据在每个分区进行组合,每个分区的一个密钥只有一个输出才能通过网络发送.reduceByKey 需要将所有值组合成另一个具有完全相同类型的值.

Data are combined at each partition, with only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type.

aggregateByKey:

reduceByKey 相同,采用初始值.

same as reduceByKey, which takes an initial value.

3 个参数作为输入一世.初始值ii.合路逻辑三、序列操作逻辑

3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic

示例:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
    val data = sc.parallelize(keysWithValuesList)
    //Create key value pairs
    val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
    val initialCount = 0;
    val addToCounts = (n: Int, v: String) => n + 1
    val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
    val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

输出:按键和结果聚合酒吧 ->3富 ->5

ouput: Aggregate By Key sum Results bar -> 3 foo -> 5

combineByKey:

3 个参数作为输入

  1. 初始值:与aggregateByKey不同,不需要总是传递常量,我们可以传递一个会返回新值的函数.
  2. 合并功能
  3. 组合功能
  1. Initial value: unlike aggregateByKey, need not pass constant always, we can pass a function that will return a new value.
  2. merging function
  3. combine function

示例:

val result = rdd.combineByKey(
                        (v) => (v,1),
                        ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
                        ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2)) 
                        ).map( { case (k,v) => (k,v._1/v._2.toDouble) })
        result.collect.foreach(println)

reduceByKey,aggregateByKey,combineByKey 优先于 groupByKey

参考:避免 groupByKey

这篇关于reduceByKey、groupByKey、aggregateByKey 和 combineByKey 之间的 Spark 差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆