为什么自定义Python对象不能与ParDo Fn一起使用? [英] Why does custom Python object cannot be used with ParDo Fn?

查看:108
本文介绍了为什么自定义Python对象不能与ParDo Fn一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前不熟悉在Python和Dataflow Runner中使用Apache Beam.我对创建发布到Google Cloud PubSub的批处理管道感兴趣,我对Beam Python API进行了修补并找到了解决方案.但是,在探索过程中,我遇到了一些有趣的问题,这些使我感到好奇.

I'm currently new to using Apache Beam in Python with Dataflow runner. I'm interested in creating a batch pipeline that publishes to Google Cloud PubSub, I had tinkered with Beam Python APIs and found a solution. However, during my explorations, I encountered some interesting problems which made me curious.

目前,我成功的用于从GCS批量发布数据的光束管道如下所示:

Currently, my successful beam pipeline for publishing data in batch manner from GCS looks like this:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2.管道失败

在这里,我试图使发布者在DoFn之间共享.我尝试了以下方法.

2. The Unsuccessful Pipelines

Here, I attempted to make the publisher shared accross DoFn. I had attempted the following methods.

a.在DoFn中初始化发布商

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b.在DoFn外部初始化Publisher,并将其传递给DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )

两次尝试通过DoFn方法共享发布者的尝试均失败,并显示以下错误消息:

Both attempts for making the publisher shared across DoFn methods failed with the following error messages:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

我的问题是:

  1. 共享的发布者实现是否可以改善光束管道性能?如果是,那么我想探索这个解决方案.

  1. Would the shared publisher implementation improve beam pipeline performance? If yes, then I would like to explore this solution.

为什么在失败的管道上会发生错误?是由于在process函数之外将自定义类对象初始化并将其传递给DoFn吗?如果由于这个原因,我该如何实现管道,以便能够在DoFn中重用自定义对象?

Why do the errors occur on my failing pipelines? Is it due to the initializing and passing custom class object to DoFn outside the process function? If it is due to that, how can I implement a pipeline such that I would able to reuse a custom object in DoFn?

谢谢您,我们将不胜感激.

Thank you, your help would be greatly appreciated.

好的,所以Ankur解释了为什么会出现我的问题,并讨论了如何在DoFn上进行序列化.基于这些知识,我现在了解到有两种解决方案可用于在DoFn中共享/重用自定义对象:

Okay, so Ankur has explained why my problem occurs and discussed how serialization is done on DoFn. Based on this knowledge, I now understand that there are two solutions for making custom object shared/reusable in DoFn:

  1. 使自定义对象可序列化:这允许在创建DoFn对象期间(在__init__下)初始化/可用该对象.该对象必须可序列化,因为它将在提交DoFn对象(调用__init__)的管道提交过程中被序列化.我如何在下面回答您如何实现这一目标.另外,我发现此要求实际上与[1] [2]下的Beam Documentation有关.

  1. Make the custom object Serializable: this allows the object to be initialized/available during DoFn object creation (under __init__). This object must be serializable since it'll get serialized during pipeline submission in which the DoFn object will be created (which calls __init__). How you can achieve this is answered below in my answer. Also, I found out that this requirement is actually associated to Beam Documentation under [1][2].

__init__之外的DoFn函数中初始化不可序列化的对象,以避免序列化,因为在管道提交期间不会调用 init 之外的函数. Ankur的答案中说明了如何实现此目的.

Initialize Non-serializable objects in DoFn's functions outside __init__ to avoid serialization since functions outside init aren't called during pipeline submission. How you can accomplish this is explained in Ankur's answer.

参考:

[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

推荐答案

PublisherClient无法正确腌制.有关在此处上进行酸洗的更多信息. 在process方法中初始化PublisherClient可以避免PublisherClient的酸洗.

PublisherClient can not be pickled correctly. More on pickling here. Initializing the PublisherClient in the process method avoids the pickling of PublisherClient.

如果要重用PublisherClient,我建议在处理方法中初始化PublisherClient,并使用以下代码将其存储在self中.

If the intent is to reuse the PublisherClient, I would recommend initializing PublisherClient in the process method and storing it in self using following code.

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        if not hasattr(self, 'publish'):
            from google.cloud import pubsub_v1
            self.publisher = pubsub_v1.PublisherClient()
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

这篇关于为什么自定义Python对象不能与ParDo Fn一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆