如何createCombiner,mergeValue,mergeCombiner工作在CombineByKey的火花(使用Scala) [英] How createCombiner,mergeValue, mergeCombiner works in CombineByKey in Spark ( Using Scala)
问题描述
我想了解 combineByKeys
每一步是如何工作的。
有人可以帮我了解同为低于RDD?
VAL RDD = sc.parallelize(名单(
(A,3),(A,9),(A,12),(A,0),(A,5),(B,4)
(B,10),(B,11),(B,20),(B,25),(C,32),(C,91),
(C,122),(C,3),(C,55)),2)rdd.combineByKey(
(X:强度)=> (X,1),
(ACC:(INT,INT),X)=> (acc._1 + X,acc._2 + 1)
(ACC1:(智力,智力),ACC2:(INT,INT))=> (acc1._1 + acc2._1,acc1._2 + acc2._2))
首先,让我们打破下跌过程:
首先, createCombiner
创建一个分区的一个关键的第一次相遇的初始值(合并),如果一个也没有找到 - >
(firstValueEncountered,1)
。所以,这仅仅是初始化的元组与所述第一值和1一键计数器
然后, mergeValue
只如果组合(在我们的情况下,元组)已经为这个分区上找到密钥创建触发 - >
(existingTuple._1 + subSequentValue,existingTuple._2 + 1)
。这与新遇到的值增加了现有元组的值(在第一时隙),并采用现有的元组的计数器(在第二插槽)并增加它。所以,我们
然后, mergeCombiner
取合上的每个分区创建(元组),并合并在一起 - >
(tupleFromPartition._1 + tupleFromPartition2._1,tupleFromPartition1._2 + tupleFromPartition2._2)
。这仅仅是增加从每个元组一起值和计数器连成一个元组
然后,让我们打破你的数据的一个子集到分区,并看到它在行动:
(A,3),(A,9),(A,12),(B,4),(B,10 ),(B,11)
分区1
A = 3 - > createCombiner(3)==> ACCUM [A] =(3,1)
A = 9 - > mergeValue(ACCUM [A],9)==> ACCUM [A] =(3 + 9,1 + 1)的
B = 11 - > createCombiner(11)==> ACCUM [B] =(11,1)
分区2
A = 12 - > createCombiner(12)==> ACCUM [A] =(12,1)
B = 4 - > createCombiner(4)==> ACCUM [B] =(4,1)
B = 10 - > mergeValue(ACCUM〔B〕,10)==> ACCUM [B] =(4 + 10,1 + 1)的
合并分区一起
A ==> mergeCombiner((12,2),(12,1))==> (12 + 12,2 + 1)的
乙==> mergeCombiner((11,1),(14,2))==> (11 + 14,1 + 2)
所以,你应当得到一个数组是这样的:
阵列((A,(24,3)),(B,(25,3)))
I am trying to understand how each step in combineByKeys
works.
Can someone please help me understand the same for the below RDD?
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4),
("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
("C", 122), ("C", 3), ("C", 55)), 2)
rdd.combineByKey(
(x:Int) => (x, 1),
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
First, let's break the process down:
First, createCombiner
creates the initial value (combiner) for a key's first encounter on a partition if one is not found -->
(firstValueEncountered, 1)
. So, this is merely initializing a tuple with the first value and a key counter of 1.
Then, mergeValue
is triggered only if a combiner (tuple in our case) has already been created for the found key on this partition -->
(existingTuple._1 + subSequentValue, existingTuple._2 + 1)
. This adds the existing tuple's value (in the first slot) with the newly encountered value and takes the existing tuple's counter (in the second slot) and increments it. So, we are
Then, mergeCombiner
takes the combiners (tuples) created on each partition and merges them together -->
(tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2)
. This is merely adding the values from each tuple together and the counters together into one tuple.
Then, let's break up a subset of your data into partitions and see it in action:
("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)
Partition 1
A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)
Partition 2
A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)
Merge partitions together
A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)
So, you should get back an array something like this:
Array((A, (24, 3)), (B, (25, 3)))
这篇关于如何createCombiner,mergeValue,mergeCombiner工作在CombineByKey的火花(使用Scala)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!