数据流错误:“客户端具有非平凡的本地状态和不可选择的状态" [英] Dataflow Error: 'Clients have non-trivial state that is local and unpickleable'
问题描述
我有一个可以在本地执行而没有任何错误的管道.我曾经在本地运行的管道中遇到此错误
I have a pipeline that I can execute locally without any errors. I used to get this error in my locally run pipeline
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
我相信我通过降级到 apache-beam=2.3.0 解决了这个问题然后在本地它会完美运行.
I believe I fixed this by downgrading to apache-beam=2.3.0 Then locally it would run perfectly.
现在我正在使用 DataflowRunner 并且在 requirements.txt 文件中我有以下依赖项
Now I am using DataflowRunner and in the requirements.txt file I have the following dependencies
apache-beam==2.3.0
google-cloud-bigquery==1.1.0
google-cloud-core==0.28.1
google-cloud-datastore==1.6.0
google-cloud-storage==1.10.0
protobuf==3.5.2.post1
pytz==2013.7
但我又犯了这个可怕的错误
but I get this dreaded error again
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
为什么它给了我 DataflowRunner 的错误而不是 DirectRunner 的错误?他们不应该使用相同的依赖项/环境吗?任何帮助,将不胜感激.
How come it's giving me the error with DataflowRunner but not DirectRunner? shouldn't they be using the same dependencies/environment? Any help would be appreciated.
我已经读到这是解决它的方法,但是当我尝试它时,我仍然遇到相同的错误
I had read that this is the way to solve it but when I try it I still get the same error
class MyDoFn(beam.DoFn):
def start_bundle(self, process_context):
self._dsclient = datastore.Client()
def process(self, context, *args, **kwargs):
# do stuff with self._dsclient
来自 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191
我之前在本地修复此问题的参考帖子:
My previous reference post where I fixed this locally:
使用 start_bundle()apache-beam 工作不起作用.Unpickleable storage.Client()
提前致谢!
推荐答案
在 start_bundle
方法中初始化 unpickleable 客户端是一种正确的方法,Beam IO 经常遵循该方法,请参阅 data.这是一个使用 DoFn 中的 GCS python 客户端执行简单操作的管道.我在 Apache Beam 2.16.0 上运行它没有问题.如果您仍然可以重现您的问题,请提供更多详细信息.
Initializing unpickleable clients in start_bundle
method is a correct approach, and Beam IOs often follow that, see datastoreio.py as an example. Here is a pipeline that does a simple operation with a GCS python client in a DoFn. I ran it on Apache Beam 2.16.0 without issues. If you can still reproduce your issue, please provide additional details.
gcs_client.py 文件:
gcs_client.py file:
import argparse
import logging
import time
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
class MyDoFn(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, element):
bucket = self.storage_client.get_bucket("existing-gcs-bucket")
blob = bucket.blob(str(int(time.time())))
blob.upload_from_string("payload")
return element
logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()
pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())
p.run().wait_until_finish()
requirements.txt 文件:
requirements.txt file:
google-cloud-storage==1.23.0
命令行:
python -m gcs_client \
--project=insert_your_project \
--runner=DataflowRunner \
--temp_location gs://existing-gcs-bucket/temp/ \
--requirements_file=requirements.txt \
--save_main_session
这篇关于数据流错误:“客户端具有非平凡的本地状态和不可选择的状态"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!