为什么自定义 Python 对象不能与 ParDo Fn 一起使用? [英] Why does custom Python object cannot be used with ParDo Fn?

查看:26
本文介绍了为什么自定义 Python 对象不能与 ParDo Fn 一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前不熟悉在 Python 中使用 Apache Beam 和 Dataflow runner.我对创建一个发布到 Google Cloud PubSub 的批处理管道很感兴趣,我修改了 Beam Python API 并找到了一个解决方案.然而,在探索过程中,我遇到了一些有趣的问题,让我很好奇.

I'm currently new to using Apache Beam in Python with Dataflow runner. I'm interested in creating a batch pipeline that publishes to Google Cloud PubSub, I had tinkered with Beam Python APIs and found a solution. However, during my explorations, I encountered some interesting problems which made me curious.

目前,我成功地从 GCS 批量发布数据的光束管道如下所示:

Currently, my successful beam pipeline for publishing data in batch manner from GCS looks like this:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2.不成功的管道

在这里,我试图让发布者在 DoFn 中共享.我尝试了以下方法.

2. The Unsuccessful Pipelines

Here, I attempted to make the publisher shared accross DoFn. I had attempted the following methods.

一个.在 DoFn 中初始化发布者

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b.在 DoFn 之外初始化 Publisher,并将其传递给 DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )

使发布者在 DoFn 方法之间共享的两次尝试都失败并显示以下错误消息:

Both attempts for making the publisher shared across DoFn methods failed with the following error messages:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

我的问题是:

  1. 共享发布者的实现会提高光束管道的性能吗?如果是,那么我想探索这个解决方案.

  1. Would the shared publisher implementation improve beam pipeline performance? If yes, then I would like to explore this solution.

为什么错误会出现在我失败的管道上?是不是因为在process 函数之外初始化和传递自定义类对象给DoFn?如果是由于这个原因,我如何实现管道以便能够在 DoFn 中重用自定义对象?

Why do the errors occur on my failing pipelines? Is it due to the initializing and passing custom class object to DoFn outside the process function? If it is due to that, how can I implement a pipeline such that I would able to reuse a custom object in DoFn?

谢谢,非常感谢您的帮助.

Thank you, your help would be greatly appreciated.

好的,Ankur 已经解释了我的问题出现的原因,并讨论了如何在 DoFn 上完成序列化.基于这些知识,我现在了解到在 DoFn 中使自定义对象共享/可重用有两种解决方案:

Okay, so Ankur has explained why my problem occurs and discussed how serialization is done on DoFn. Based on this knowledge, I now understand that there are two solutions for making custom object shared/reusable in DoFn:

  1. 使自定义对象可序列化:这允许对象在 DoFn 对象创建期间初始化/可用(在 __init__ 下).这个对象必须是可序列化的,因为它会在创建 DoFn 对象(调用 __init__)的管道提交期间被序列化.如何实现这一点在我的回答中回答如下.此外,我发现此要求实际上与 [1][2] 下的 Beam 文档相关联.

  1. Make the custom object Serializable: this allows the object to be initialized/available during DoFn object creation (under __init__). This object must be serializable since it'll get serialized during pipeline submission in which the DoFn object will be created (which calls __init__). How you can achieve this is answered below in my answer. Also, I found out that this requirement is actually associated to Beam Documentation under [1][2].

__init__ 之外的 DoFn 函数中初始化不可序列化对象以避免序列化,因为在管道提交期间不会调用 init 之外的函数.Ankur 的回答中解释了如何实现这一点.

Initialize Non-serializable objects in DoFn's functions outside __init__ to avoid serialization since functions outside init aren't called during pipeline submission. How you can accomplish this is explained in Ankur's answer.

参考文献:

[1] https://beam.apache.org/文档/编程指南/#core-beam-transforms

[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

推荐答案

PublisherClient 无法正确pickle.有关酸洗的更多信息此处.在process方法中初始化PublisherClient,避免了PublisherClient的pickling.

PublisherClient can not be pickled correctly. More on pickling here. Initializing the PublisherClient in the process method avoids the pickling of PublisherClient.

如果目的是重用 PublisherClient,我建议在 process 方法中初始化 PublisherClient 并使用以下代码将其存储在 self.

If the intent is to reuse the PublisherClient, I would recommend initializing PublisherClient in the process method and storing it in self using following code.

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        if not hasattr(self, 'publish'):
            from google.cloud import pubsub_v1
            self.publisher = pubsub_v1.PublisherClient()
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

这篇关于为什么自定义 Python 对象不能与 ParDo Fn 一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆