ParDo 中的侧输出 |Apache Beam Python SDK [英] Side output in ParDo | Apache Beam Python SDK

查看:25
本文介绍了ParDo 中的侧输出 |Apache Beam Python SDK的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于文档仅适用于 JAVA,我无法真正理解它的含义.

As the documentation is only available for JAVA, I could not really understand what it means.

它指出 - 虽然 ParDo 总是产生一个主输出 PCollection(作为 apply 的返回值),您也可以让 ParDo 产生任意数量的额外输出 PCollection.如果您选择有多个输出,您的ParDo 将返回捆绑在一起的所有输出 PCollections(包括主输出).例如,在 Java 中,输出 PCollection 捆绑在类型安全的 PCollectionTuple 中."

我明白捆绑在一起是什么意思,但是如果我在我的 DoFn 中生成一个标签,它是否会在所有其他输出为空的情况下生成一个捆绑包,并在代码中遇到其他输出时生成其他输出?或者它等待所有收益准备好输入和输出它们一起打包?

I understand what bundled together means, but if i am yielding a tag in my DoFn, does it yields with a bundle with all other outputs empty on the go and yield other outputs when they are encountered in code? or it waits for all yields to be ready for a input and the outputs them all together in a bundle?

文档中并没有很清楚地说明它.虽然我认为它不会等待,只是在遇到时产生,但我仍然需要了解发生了什么.

There isnt much clarity around it in the documentation. Although i think it doesnt wait and just yields when encountered, but I still need understand what is happening.

推荐答案

回答这个问题的最好方法是举例.这个例子是在 Beam 中可用.

The best way to answer this is with an example. This example is available in Beam.

假设您要运行一个字数统计管道(例如,计算每个单词在文档中出现的次数).为此,您需要将文件中的行拆分为单独的单词.考虑到您还想单独计算字长.您的拆分变换将如下所示:

Suppose that you want to run a word count pipeline (e.g. count the number of times each word appears in a document). For this you need to split lines in a file into individual words. Consider that you also want to count word lengths individually. Your splitting transform would be like so:

with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)  # Read in the file

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    split_lines_result = (lines
                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                              main='words'))

    short_words = split_lines_result['words']
    character_count = split_lines_result[
        SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]

在这种情况下,每个都是不同的 PCollection,具有正确的元素.DoFn 将负责拆分其输出,并通过标记元素来实现.见:

In this case, each is a different PCollection, with the right elements. The DoFn would be in charge of splitting its outputs, and it does it by tagging elements. See:

class SplitLinesToWordsFn(beam.DoFn):
  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'

  def process(self, element):
    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
    # collection.
    yield pvalue.TaggedOutput(
        self.OUTPUT_TAG_CHARACTER_COUNT, len(element))

    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      # yield word to add it to the main collection.
      yield word

如您所见,对于主输出,您不需要标记元素,但对于其他输出,您需要标记.

As you can see, for the main output, you do not need to tag the elements, but for the other outputs you do.

这篇关于ParDo 中的侧输出 |Apache Beam Python SDK的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆