数据流进入束流管道的其他参数 [英] The Additional Paramates at Dataflow into Beam Pipeline

查看:68
本文介绍了数据流进入束流管道的其他参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究数据流,我已经通过Python SDK构建了自定义管道.我想将Dataflow UI的参数添加到我的自定义管道中.使用其他参数.通过

然后我将 add_argument 更改为 add_value_provider_argument ,然后再添加google docs

  CustomParams(PipelineOptions)类:@classmethoddef _add_argparse_args(cls,parser):parser.add_value_provider_argument("--input_topic",类型= str,)parser.add_value_provider_argument("--window_size",类型= int,默认值= 5)def run():pipeline_options = PipelineOptions(pipeline_args,.....)custom_param = pipeline_options.view_as(CustomParams).....管道|阅读PubSub消息">>beam.io.ReadFromPubSub(custom_param.input_topic) 

之后,我尝试为GCP创建模板.上载脚本类似于

  python custom_pipeline.py \--runner DataflowRunner \--project YOUR_PROJECT_ID \--staging_location gs://YOUR_BUCKET_NAME/staging \--temp_location gs://YOUR_BUCKET_NAME/temp \--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME 

但是我在创建要上传到GCS的模板时遇到了这样的错误

  TypeError:预期的字符串或类似字节的对象 

beam.io.ReadFromPubSub()

行处的

我从 add_value_provider_argument 获得的东西似乎是 RuntimeValueProvider 对象.因此,我很困惑解决此问题所要做的事情?

我尝试解决此问题,例如

铸造数据类型

beam.io.ReadFromPubSub(str(custom_param.input_topic))

但是出现此错误,

ValueError:PubSub主题的格式必须为"projects/< project>/topics/< topic>"(获得"RuntimeValueProvider(选项:input_topic,类型:str,default_value:'...')").

所以,请问有人对此有问题排查吗?我不知道该如何去做.

解决方案

如@mk_sta

所述

似乎ReadFromPubSub模块不接受ValueProvider.你检查过这个堆栈线程了吗?

并在在不同的SDK中,为 ValueProvider 支持的运行时参数的I/O方法.

因此,此刻,如果您从Python SDK切换到Java SDK,则https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue

Then I changed add_argument to add_value_provider_argument follow by google docs

class CustomParams(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):        
        parser.add_value_provider_argument(
            "--input_topic",
            type = str,
        )
        parser.add_value_provider_argument(
            "--window_size",
            type = int,
            default = 5,
        )

def run():
    pipeline_options = PipelineOptions(pipeline_args, .....)
    custom_param = pipeline_options.view_as(CustomParams)
    .....
    pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)

After that, I try to making a templates to GCP. The script for upload look like

  python custom_pipeline.py \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

But I got the error when I creating template for upload to GCS, like this

TypeError: expected string or bytes-like object

at the line beam.io.ReadFromPubSub()

It's look like the thing I got from add_value_provider_argument Is RuntimeValueProvider object. So I'm quite confuse what I have to do for fix this?

I try to fix this problem such as

Casting the data type

beam.io.ReadFromPubSub(str(custom_param.input_topic))

But got this error,

ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").

So Please anyone have troubleshooting for this? I have no idea how to go no it.

解决方案

As mentioned by @mk_sta

It seems that ReadFromPubSub module doesn't accept ValueProvider. Have you checked this Stack thread?

and explained in that thread, ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.

You can check I/O methods that accept runtime parameters for the ValueProvider support in different SDKs.

So at this moment, if you switch from Python SDK to Java SDK, the Read of PubSubIO does support ValueProvider.

这篇关于数据流进入束流管道的其他参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆