如何createCombiner,mergeValue,mergeCombiner工作在CombineByKey的火花(使用Scala) [英] How createCombiner,mergeValue, mergeCombiner works in CombineByKey in Spark ( Using Scala)

查看:327
本文介绍了如何createCombiner,mergeValue,mergeCombiner工作在CombineByKey的火花(使用Scala)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想了解 combineByKeys 每一步是如何工作的。

有人可以帮我了解同为低于RDD?

  VAL RDD = sc.parallelize(名单(
  (A,3),(A,9),(A,12),(A,0),(A,5),(B,4)
  (B,10),(B,11),(B,20),(B,25),(C,32),(C,91),
   (C,122),(C,3),(C,55)),2)rdd.combineByKey(
    (X:强度)=> (X,1),
    (ACC:(INT,INT),X)=> (acc._1 + X,acc._2 + 1)
    (ACC1:(智力,智力),ACC2:(INT,INT))=> (acc1._1 + acc2._1,acc1._2 + acc2._2))


解决方案

首先,让我们打破下跌过程:

首先, createCombiner 创建一个分区的一个关键的第一次相遇的初始值(合并),如果一个也没有找到 - > (firstValueEncountered,1)。所以,这仅仅是初始化的元组与所述第一值和1一键计数器

然后, mergeValue 只如果组合(在我们的情况下,元组)已经为这个分区上找到密钥创建触发 - > (existingTuple._1 + subSequentValue,existingTuple._2 + 1)。这与新遇到的值增加了现有元组的值(在第一时隙),并采用现有的元组的计数器(在第二插槽)并增加它。所以,我们

然后, mergeCombiner 取合上的每个分区创建(元组),并合并在一起 - > (tupleFromPartition._1 + tupleFromPartition2._1,tupleFromPartition1._2 + tupleFromPartition2._2)。这仅仅是增加从每个元组一起值和计数器连成一个元组

然后,让我们打破你的数据的一个子集到分区,并看到它在行动:

 (A,3),(A,9),(A,12),(B,4),(B,10 ),(B,11)

分区1

  A = 3  - > createCombiner(3)==> ACCUM [A] =(3,1)
A = 9 - > mergeValue(ACCUM [A],9)==> ACCUM [A] =(3 + 9,1 + 1)的
B = 11 - > createCombiner(11)==> ACCUM [B] =(11,1)

分区2

  A = 12  - > createCombiner(12)==> ACCUM [A] =(12,1)
B = 4 - > createCombiner(4)==> ACCUM [B] =(4,1)
B = 10 - > mergeValue(ACCUM〔B〕,10)==> ACCUM [B] =(4 + 10,1 + 1)的

合并分区一起

  A ==> mergeCombiner((12,2),(12,1))==> (12 + 12,2 + 1)的
乙==> mergeCombiner((11,1),(14,2))==> (11 + 14,1 + 2)

所以,你应当得到一个数组是这样的:

 阵列((A,(24,3)),(B,(25,3)))

I am trying to understand how each step in combineByKeys works.

Can someone please help me understand the same for the below RDD?

val rdd = sc.parallelize(List(
  ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4), 
  ("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
   ("C", 122), ("C", 3), ("C", 55)), 2)

rdd.combineByKey(
    (x:Int) => (x, 1),
    (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
    (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

解决方案

First, let's break the process down:

First, createCombiner creates the initial value (combiner) for a key's first encounter on a partition if one is not found --> (firstValueEncountered, 1). So, this is merely initializing a tuple with the first value and a key counter of 1.

Then, mergeValue is triggered only if a combiner (tuple in our case) has already been created for the found key on this partition --> (existingTuple._1 + subSequentValue, existingTuple._2 + 1). This adds the existing tuple's value (in the first slot) with the newly encountered value and takes the existing tuple's counter (in the second slot) and increments it. So, we are

Then, mergeCombiner takes the combiners (tuples) created on each partition and merges them together --> (tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2). This is merely adding the values from each tuple together and the counters together into one tuple.

Then, let's break up a subset of your data into partitions and see it in action:

("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)

Partition 1

A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)

Partition 2

A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)

Merge partitions together

A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)

So, you should get back an array something like this:

Array((A, (24, 3)), (B, (25, 3)))

这篇关于如何createCombiner,mergeValue,mergeCombiner工作在CombineByKey的火花(使用Scala)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆