Apache Beam 的CombineValues 在执行算术运算时如何对元素进行运算 [英] How does Apache Beam's CombineValues operate over elements when executing arithmetic operations
问题描述
这有点人为的例子,但我一直在探索 CombineValues 的 >docs 并希望了解我所看到的.
This is a bit of a contrived example, but I have been exploring the docs for CombineValues
and wish understand what I'm seeing.
如果我组合值并对这些值执行一些算术运算(目标是计算有界流中存在的键的百分比),那么我需要使用 AverageFn
(定义在 <docs 中的 em>示例 8 并在源代码示例片段).
If I combine values and perform some arithmetic operations on the values (the goal is to calculate percentages of keys present in a bounded stream), then I need to use the AverageFn
(as defined in Example 8 in docs and provided in the source code example snippets).
但是,这(基于示例 5)不起作用:
However, this (based on Example 5) does not work:
with beam.Pipeline() as pipeline:
counts = ( pipeline
| 'create' >> beam.Create(['xxx'])
| 'key it' >> beam.Map(lambda elem: (elem, 1))
| 'combine' >> beam.CombinePerKey(
lambda values: sum(values)/2
)
| 'print' >> beam.Map(print)
)
因为它产生
('xxx', 0.25)
我最终想通过
totals = pipeline | 'Count elements' >> beam.combiners.Count.Globally()
然后使用他们建议的单例方法(我提供beam.pvalue.AsSingleton(totals)
到 beam.CombineValues
).
and then use the singleton approach they suggest (where I provide
beam.pvalue.AsSingleton(totals)
to beam.CombineValues
).
我的问题是,为什么CombineValues 似乎执行了两次(可能会有些蒙面)?
My question is, why does CombineValues appear to execute twice (probably going to be some facepalming)?
推荐答案
组合器被调用两次的原因是 MapReduce 阶段.由于您使用的函数(将平均值减半)不是关联的,因此您需要一个高级组合器";如您提到的示例 8.
The reason the combiner is being called twice is because of the MapReduce phases. Since the function you are using (halving the mean) is not associative, you'd need a an "advance combiner" as in the example 8 you mention.
您当前代码中发生的事情是,从 (xxx, 1)
计算一半平均值 (xxx, 0.5)
然后,在合并值时,它再减半,使 (xxx, 0.25)
.
What is happening in your current code is, from (xxx, 1)
calculate the half mean (xxx, 0.5)
and then, when merging the values, it halves it again, making (xxx, 0.25)
.
在这个答案中我解释了类似的概念.
In this answer I explain a similar concept.
如上所述,对于您的特定情况,您需要高级组合器"
For your particular case, as mentioned, you need "advance combiners"
with beam.Pipeline() as pipeline:
def combiner(elements):
print(elements)
return sum(elements)/2
class HalfMean(beam.CombineFn):
def create_accumulator(self):
# Tuple of sum, count
return (0, 0)
def add_input(self, accumulator, input):
# Add the current element to sum, add one to count
new_tuple = (accumulator[0] + input, accumulator[1] + 1)
return new_tuple
def merge_accumulators(self, accumulators):
# Join all accumulators
partial_sums = [x[0] for x in accumulators]
partial_counts = [x[1] for x in accumulators]
merged_tuple = (sum(partial_sums), sum(partial_counts))
return merged_tuple
def extract_output(self, sum_count_tuple):
# Now we can output half of the mean
mean = sum_count_tuple[0]/sum_count_tuple[1]
return mean/2
counts = ( pipeline
| 'create' >> beam.Create(['xxx'])
| 'key it' >> beam.Map(lambda elem: (elem, 1))
#| 'combine' >> beam.CombinePerKey(combiner)
| 'advance combine' >> beam.CombinePerKey(HalfMean())
| 'print' >> beam.Map(print)
)
我要给你的旧组合器留下一张照片,让你看看发生了什么.
I'm leaving your old combiner with a print so you see what's happening.
无论如何,这仍然不是 CombineValues
而是 CombinerPerKey
.CombineValues
接受一个键值对,该值是一个迭代器,并将组合器应用于它.在下面的例子中,它采用的元素是 ('a', [1, 2, 3])
和 ('b', [10])
.给你一个例子
Anyhow, that is still not a CombineValues
but a CombinerPerKey
. CombineValues
takes a key value pair on which the value is an iterator, and applies the combiner to it. In the following case, the elements that it's taking are ('a', [1, 2, 3])
and ('b', [10])
. Here you have the example
kvs = [('a', 1),
('a', 2),
('a', 3),
('b', 10),
]
combine_values = (pipeline
| 'create_kvs' >> beam.Create(kvs)
| 'gbk' >> beam.GroupByKey()
| 'combine values' >> beam.CombineValues(HalfMean())
| 'print cv' >> beam.Map(print)
)
这篇关于Apache Beam 的CombineValues 在执行算术运算时如何对元素进行运算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!