Apache Beam 的CombineValues 在执行算术运算时如何对元素进行运算 [英] How does Apache Beam's CombineValues operate over elements when executing arithmetic operations

查看:28
本文介绍了Apache Beam 的CombineValues 在执行算术运算时如何对元素进行运算的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这有点人为的例子,但我一直在探索 CombineValues 的 >docs 并希望了解我所看到的.

This is a bit of a contrived example, but I have been exploring the docs for CombineValues and wish understand what I'm seeing.

如果我组合值并对这些值执行一些算术运算(目标是计算有界流中存在的键的百分比),那么我需要使用 AverageFn(定义在 <docs 中的 em>示例 8 并在源代码示例片段).

If I combine values and perform some arithmetic operations on the values (the goal is to calculate percentages of keys present in a bounded stream), then I need to use the AverageFn (as defined in Example 8 in docs and provided in the source code example snippets).

但是,这(基于示例 5)不起作用:

However, this (based on Example 5) does not work:

with beam.Pipeline() as pipeline:

  counts = ( pipeline
             | 'create' >>  beam.Create(['xxx'])
             | 'key it' >> beam.Map(lambda elem: (elem, 1))
             | 'combine' >> beam.CombinePerKey(
                 lambda values: sum(values)/2
                 )
             | 'print' >> beam.Map(print)
           )

因为它产生

('xxx', 0.25)

我最终想通过

  totals = pipeline | 'Count elements' >> beam.combiners.Count.Globally()

然后使用他们建议的单例方法(我提供beam.pvalue.AsSingleton(totals)beam.CombineValues).

and then use the singleton approach they suggest (where I provide beam.pvalue.AsSingleton(totals) to beam.CombineValues).

我的问题是,为什么CombineValues 似乎执行了两次(可能会有些蒙面)?

My question is, why does CombineValues appear to execute twice (probably going to be some facepalming)?

推荐答案

组合器被调用两次的原因是 MapReduce 阶段.由于您使用的函数(将平均值减半)不是关联的,因此您需要一个高级组合器";如您提到的示例 8.

The reason the combiner is being called twice is because of the MapReduce phases. Since the function you are using (halving the mean) is not associative, you'd need a an "advance combiner" as in the example 8 you mention.

您当前代码中发生的事情是,从 (xxx, 1) 计算一半平均值 (xxx, 0.5) 然后,在合并值时,它再减半,使 (xxx, 0.25).

What is happening in your current code is, from (xxx, 1) calculate the half mean (xxx, 0.5) and then, when merging the values, it halves it again, making (xxx, 0.25).

在这个答案中我解释了类似的概念.

In this answer I explain a similar concept.

如上所述,对于您的特定情况,您需要高级组合器"

For your particular case, as mentioned, you need "advance combiners"

 with beam.Pipeline() as pipeline:

    def combiner(elements):
        print(elements)
        return sum(elements)/2

    class HalfMean(beam.CombineFn):
        def create_accumulator(self):
            # Tuple of sum, count
            return (0, 0)

        def add_input(self, accumulator, input):
            # Add the current element to sum, add one to count
            new_tuple = (accumulator[0] + input, accumulator[1] + 1)
            return new_tuple

        def merge_accumulators(self, accumulators):
            # Join all accumulators
            partial_sums = [x[0] for x in accumulators]
            partial_counts = [x[1] for x in accumulators]
            merged_tuple = (sum(partial_sums), sum(partial_counts))
            return merged_tuple

        def extract_output(self, sum_count_tuple):
            # Now we can output half of the mean
            mean = sum_count_tuple[0]/sum_count_tuple[1]
            return mean/2

    counts = ( pipeline
             | 'create' >> beam.Create(['xxx'])
             | 'key it' >> beam.Map(lambda elem: (elem, 1))
             #| 'combine' >> beam.CombinePerKey(combiner)
             | 'advance combine' >> beam.CombinePerKey(HalfMean())
             | 'print' >> beam.Map(print)
           )

我要给你的旧组合器留下一张照片,让你看看发生了什么.

I'm leaving your old combiner with a print so you see what's happening.

无论如何,这仍然不是 CombineValues 而是 CombinerPerKey.CombineValues 接受一个键值对,该值是一个迭代器,并将组合器应用于它.在下面的例子中,它采用的元素是 ('a', [1, 2, 3])('b', [10]).给你一个例子

Anyhow, that is still not a CombineValues but a CombinerPerKey. CombineValues takes a key value pair on which the value is an iterator, and applies the combiner to it. In the following case, the elements that it's taking are ('a', [1, 2, 3]) and ('b', [10]). Here you have the example

    kvs = [('a', 1),
           ('a', 2),
           ('a', 3),
           ('b', 10),
   ]

    combine_values = (pipeline
             | 'create_kvs' >> beam.Create(kvs)
             | 'gbk' >> beam.GroupByKey()
             | 'combine values' >> beam.CombineValues(HalfMean())
             | 'print cv' >> beam.Map(print)
           )

这篇关于Apache Beam 的CombineValues 在执行算术运算时如何对元素进行运算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆