ParDo中的侧面输出|Apache Beam Python SDK [英] Side output in ParDo | Apache Beam Python SDK

查看:59
本文介绍了ParDo中的侧面输出|Apache Beam Python SDK的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于该文档仅适用于JAVA,因此我无法真正理解其含义.

As the documentation is only available for JAVA, I could not really understand what it means.

它表示-虽然ParDo始终生成主输出PCollection(作为apply的返回值),但您也可以让ParDo生成任意数量的其他输出PCollection.如果选择具有多个输出,则您的ParDo将返回捆绑在一起的所有输出PCollections(包括主输出).例如,在Java中,输出PCollections捆绑在类型安全的PCollectionTuple中."

我了解捆绑在一起的含义,但是如果我在DoFn中产生一个标签,它是否会随捆绑在一起产生捆绑,而所有其他输出在旅途中还是空的,并且在代码中遇到它们时会产生其他输出?还是等待所有收益准备好输入,然后将它们全部捆绑在一起输出?

I understand what bundled together means, but if i am yielding a tag in my DoFn, does it yields with a bundle with all other outputs empty on the go and yield other outputs when they are encountered in code? or it waits for all yields to be ready for a input and the outputs them all together in a bundle?

文档中对此没有太多的说明.尽管我认为它不会等待,只会在遇到问题时屈服,但是我仍然需要了解发生了什么.

There isnt much clarity around it in the documentation. Although i think it doesnt wait and just yields when encountered, but I still need understand what is happening.

推荐答案

回答这个问题的最佳方法是举一个例子.此示例为在Beam中可用.

The best way to answer this is with an example. This example is available in Beam.

假设您要运行字数统计管道(例如,计算每个字在文档中出现的次数).为此,您需要将文件中的行拆分为单个单词.考虑到您还想单独计算字长.您的分割变换将如下所示:

Suppose that you want to run a word count pipeline (e.g. count the number of times each word appears in a document). For this you need to split lines in a file into individual words. Consider that you also want to count word lengths individually. Your splitting transform would be like so:

with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)  # Read in the file

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    split_lines_result = (lines
                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                              main='words'))

    short_words = split_lines_result['words']
    character_count = split_lines_result[
        SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]

在这种情况下,每个元素都是具有正确元素的不同 PCollection . DoFn 将负责拆分其输出,并通过标记元素来完成.参见:

In this case, each is a different PCollection, with the right elements. The DoFn would be in charge of splitting its outputs, and it does it by tagging elements. See:

class SplitLinesToWordsFn(beam.DoFn):
  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'

  def process(self, element):
    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
    # collection.
    yield pvalue.TaggedOutput(
        self.OUTPUT_TAG_CHARACTER_COUNT, len(element))

    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      # yield word to add it to the main collection.
      yield word

如您所见,对于主输出,您不需要标记元素,而对于其他输出,您需要标记元素.

As you can see, for the main output, you do not need to tag the elements, but for the other outputs you do.

这篇关于ParDo中的侧面输出|Apache Beam Python SDK的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆