在apache-beam作业中使用start_bundle()无法正常工作.不可挑剔的storage.Client() [英] Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()

查看:114
本文介绍了在apache-beam作业中使用start_bundle()无法正常工作.不可挑剔的storage.Client()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我收到此错误

pickle.PicklingError:显式地不腌制客户端对象 支持的.客户端具有非平凡状态,即本地状态和 没刺的.

pickle.PicklingError: Pickling client objects is explicitly not supported. Clients have non-trivial state that is local and unpickleable.

当尝试使用beam.ParDo调用看起来像这样的函数

When trying to use beam.ParDo to call a function that looks like this

class ExtractBlobs(beam.DoFn):
    def start_bundle(self):
        self.storageClient = storage.Client()

    def process(self, element):
        client = self.storageClient
        bucket = client.get_bucket(element)
        blobs = list(bucket.list_blobs(max_results=100))
        return blobs

我认为start_bundle的全部要点是初始化self.someProperty,然后在"process"方法中使用该self.someProperty摆脱酸洗问题(来自下面的资源) 谁能指出我正确解决该问题的方向?

I thought the whole point of the start_bundle was to initialize self.someProperty and then use that self.someProperty in the 'process' method to get rid of the pickling problem (from sources below) Could anyone point me into the right direction of how to solve this?

[+]我读过的东西:

[+] What I've read:

https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

如何解决类apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum上的酸洗错误?

推荐答案

更新:该问题实际上是库问题.我必须具有正确的apache-beam SDK版本和正确的google-cloud版本:

UPDATED: The issue was actually a library issue. I had to have the correct apache-beam SDK version with the correct google-cloud versions:

gapic-google-cloud-pubsub-v1 == 0.15.4

gapic-google-cloud-pubsub-v1==0.15.4

gax-google-logging-v2 == 0.8.3

gax-google-logging-v2==0.8.3

gax-google-pubsub-v1 == 0.8.3

gax-google-pubsub-v1==0.8.3

google-api-core == 1.1.2 google-api-python-client == 1.6.7

google-api-core==1.1.2 google-api-python-client==1.6.7

google-apitools == 0.5.10

google-apitools==0.5.10

google-auth == 1.4.1

google-auth==1.4.1

google-auth-httplib2 == 0.0.3

google-auth-httplib2==0.0.3

google-cloud-bigquery == 1.1.0

google-cloud-bigquery==1.1.0

google-cloud-core == 0.28.1

google-cloud-core==0.28.1

google-cloud-datastore == 1.6.0

google-cloud-datastore==1.6.0

google-cloud-pubsub == 0.26.0

google-cloud-pubsub==0.26.0

google-cloud-storage == 1.10.0

google-cloud-storage==1.10.0

google-gax == 0.12.5

google-gax==0.12.5

apache-beam == 2.3.0

apache-beam==2.3.0

能够通过看似组合的方式解决此问题,首先我不对任何内容进行序列化(产量中难看的一个衬里),其次是使用threading.local()

Was able to solve this by what seems a combination of things, first I don't serialize anything (ugly one liner in the yield) and second is using threading.local()

class ExtractBlobs(beam.DoFn):
    def start_bundle(self):
        self.threadLocal = threading.local()
        self.threadLocal.client = storage.Client()

    def process(self, element):
        yield list(storage.Client().get_bucket(element).list_blobs(max_results=100))

这篇关于在apache-beam作业中使用start_bundle()无法正常工作.不可挑剔的storage.Client()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆