如何在Par.Do转换中访问管道选项? [英] How to access pipeline options within Par.Do transforms?

查看:47
本文介绍了如何在Par.Do转换中访问管道选项?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TL; DR:如何在创建作业时访问Par.Do转换中传递给作业的参数?

TL;DR: How can I access parameters passed to the job at job creation time in my Par.Do transforms?

我有两个模板,一个用于Dev,一个用于Prod,它们都可以正常工作,除了每个模板中都有一个值需要不同的值.到目前为止,我一直在硬编码"该值,然后运行" java程序以构建模板(使用DataflowRunner运行器).但这很容易出错,如果我不是很认真的话,我将尝试更新开发模板中的一些代码,并且无意间仍然在prod模板中设置了该值.不好.

I have two templates, one for Dev and one for Prod and they all work fine, except there is one value that needs to be different in each template. So far I've been "hardcoding" this value then I "run" java program to build the template (using the DataflowRunner runner). But this is error prone and if I'm not really really careful I will try to update some code in the dev template and inadvertently still have this value set from the prod template. Not good.

所以,我认为Pipeline Options会很好,我只是在模板编译时甚至在模板运行时传入了一个不同的参数,但是我有时间访问Par中的值.在需要的地方进行变换. 如果我使用默认运行程序并在本地运行管道,则效果很好,但是当我切换并构建模板时,该值始终为null.我可以使用以下代码来重现此内容:

So, I thought the Pipeline Options would be good, I just pass in a different parameter, either at template compile time or even at template run time, but I am having a bear of a time accessing the value within the Par.Do transform where I need it. It works fine if I use the default runner and run the pipeline locally, but when I switch over and build the template, the value is always null. I can reproduce this with the following code:

/* 
imports...
*/

@SuppressWarnings("serial")
public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  static String orgId;

  public interface MyOptions extends PipelineOptions {


     @Description("Org Id")
     @Default.String("123-984-a")
     String getOrgId();
     void setOrgId( String orgID );

  }

  public static void main(String[] args) {


     PipelineOptionsFactory.register(MyOptions.class);


     final MyOptions options = PipelineOptionsFactory.fromArgs( args ).withValidation().create()
        .as( MyOptions.class );


     orgId = options.getOrgId();

     LOG.info( "orgId: " + orgId );

     Pipeline p = Pipeline.create( options );


     PCollection<String> someDataRows = p.apply("Get data from BQ", Create.of(

      "string 1", "string2", "string 3"

     ) );


     someDataRows.apply( "Package into a list", ParDo.of( new DoFn<String, String>() {

           @ProcessElement
           public void processElement( ProcessContext c ) {

              LOG.info( "Hello? " );
              LOG.info( "ORG ID: " + orgId );
           }

           }));


    p.run();
  }
}

云中的输出为:

 2018-09-20 (16:16:49) Hello?
 2018-09-20 (16:16:49) ORG ID: null
 2018-09-20 (16:16:51) Hello?
 2018-09-20 (16:16:51) ORG ID: null
 2018-09-20 (16:16:53) Hello?
 2018-09-20 (16:16:53) ORG ID: null
 ...

但在本地:

Sep 20, 2018 4:15:32 PM simplepipeline.StarterPipeline main
INFO: orgId: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47

这些是我用于模板的构建参数:

These are the build parameters I'm using for the template:

--project=the-project
--stagingLocation=gs://staging.the-project.appspot.com/staging/
--tempLocation=gs://staging.the-project.appspot.com/temp/
--runner=DataflowRunner
--region=us-west1
--templateLocation=gs://staging.the-project.appspot.com/templates/NoobPipelineDev
--orgId=jomama47

对于本地:

--project=the-project
--tempLocation=gs://staging.the-project.appspot.com
--orgId=jomama47

当我在Dataflow控制台(浏览器)中以参数orgIdjomama77的形式创建作业时,我尝试将参数传递给该作业,但是它仍然显示为空.

I tried passing parameters to the job when I created the job in the Dataflow console (browser) in the parameters fiels as orgId and jomama77, but it still comes out as null.

很抱歉,很长的帖子.

推荐答案

这里有两件事.首先,我建议使用

There are two things here. First of all, I would recommend using ValueProvider so that you can pass the parameter at runtime for different orgId:

public interface MyOptions extends PipelineOptions {    
     @Description("Org Id")
     @Default.String("123-984-a")
     ValueProvider<String> getOrgId();
     void setOrgId(ValueProvider<String> orgID);   
}

然后使用以下选项从选项中读取它:

Then read it from options with:

ValueProvider<String> orgId = options.getOrgId();

要使其在ParDo中可访问,您可以将其作为参数传递给构造函数,例如

For this to be accessible within the ParDo you can pass it as a parameter to the constructor such as the example in the docs:

someDataRows.apply( "Package into a list", ParDo.of( new CustomFn(orgId)));

其中,CustomFn的构造函数将其用作参数,并将其存储在ValueProvider中,以便可以从ParDo中对其进行访问.注意,现在您需要使用orgId.get():

where CustomFn's constructor takes it as an arguments and stores it in a ValueProvider so that it's accessible from within the ParDo. Notice that now you'll need to use orgId.get():

static class CustomFn extends DoFn<String, String> {
    // access options from wihtin the ParDo
    ValueProvider<String> orgId;
    public CustomFn(ValueProvider<String> orgId) {
        this.orgId = orgId;
    }

    @ProcessElement
    public void processElement( ProcessContext c ) {
      LOG.info( "Hello? " );
      LOG.info( "ORG ID: " + orgId.get() );
    }
}

现在,您可以暂存模板并通过以下方式调用它:

Now you can stage the template and call it with:

gcloud dataflow jobs run $JOB_NAME \
    --gcs-location gs://$BUCKET/templates/$TEMPLATE_NAME \
    --parameters orgId=jomama47

这应该可以正常工作:

这篇关于如何在Par.Do转换中访问管道选项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆