Beam:ReadAllFromText 从 DoFn 接收字符串或列表的不同行为? [英] Beam: ReadAllFromText receive string or list from DoFn different behavior?

查看:32
本文介绍了Beam:ReadAllFromText 从 DoFn 接收字符串或列表的不同行为?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个从 GCSPub\Sub 的管道读取文件,

I have one pipeline read file from GCS through Pub\Sub,

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

class LogFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return [element]

class LogPassThroughFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return element

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
    | 'Log Results' >> beam.ParDo(LogFn())
    # | 'Log Results' >> beam.ParDo(LogPassThroughFn())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

LogPassThroughFnLogPassThroughFn的区别在于返回值的类型,一个是string,另一个是list.并且LogFn 在测试代码中运行良好,但是LogPassThroughFn 使管道无法运行.每个这个问题答案

The difference of LogPassThroughFn and LogPassThroughFn is the type of return value, one the string, the other is list. And the LogFn works well in test codes, but LogPassThroughFn make the pipeline failed to run. Per this issue answer

Beam Python SDK 仍会尝试将 ParDo 的输出解释为元素集合.它通过将您发出的字符串解释为字符集合来实现.

Beam Python SDK still tries to interpret the output of that ParDo as if it was a collection of elements. And it does so by interpreting the string you emitted as collection of characters.

我们知道 LogFn 应该可以正常工作.

We know LogFn should work correctly.

但是,我注意到 ExtractFileNameFn 返回的是 string 而不是 list.那是对的吗?然后我测试如下,在 ExtractFileNameFn1

However, I notice the ExtractFileNameFn return string rather than list. Is that correct? Then I test it as below, return list in ExtractFileNameFn1

class ExtractFileNameFn1(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield [file_name]

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

现在,管道无法运行...

Now, the pipeline failed to run...

我的问题是在 DoFn 中 return string 和 return list 有什么区别?为什么ReadAllFromText 可以从ExtractFileNameFn 接收string,而从LogFn 接收list?

My question is What the difference between return string and return list in DoFn? Why ReadAllFromText could receive string from ExtractFileNameFn, but receive list from LogFn?

梁版本:2.14.0

推荐答案

ParDo 的文档说:

请注意,DoFn 必须为输入 PCollection 的每个元素返回一个可迭代对象.一个简单的方法是在 process 方法中使用 yield 关键字.

Note that the DoFn must return an iterable for each element of the input PCollection. An easy way to do this is to use the yield keyword in the process method.

https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo

返回可迭代对象的目的是您的输入元素可能不会将 1-1 映射到您的输出元素.单个输入可能产生多个输出.

The purpose of returning an iterable is that your input elements may not map 1-1 with your output elements. A single input may produce multiple outputs.

您可以随时yield它们,或者您可以将它们收集到一个列表中并在最后return它们

you are able to yield them as you go, or you can gather them up into a list and return them at the end

所以:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

应该是这样的:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        return [file_name]

两者的输出元素都是字符串,每个输出元素都是一个文件名

the output elements for both are strings, each output element being a filename

当你做yield [file_name]时,每个输出元素实际上是一个包含字符串的列表

When you do yield [file_name], each output element is actually a list containing a string

这篇关于Beam:ReadAllFromText 从 DoFn 接收字符串或列表的不同行为?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆