使用TextIO和ValueProvider创建数据流模板时出错 [英] Error creating dataflow template with TextIO and ValueProvider

查看:68
本文介绍了使用TextIO和ValueProvider创建数据流模板时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个Google数据流模板,但是在没有产生以下异常的情况下,我似乎找不到一种方法:

I am trying to create a google dataflow template but I can't seem to find a way to do it without producing the following exception:

WARNING: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=null}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile, default=null}
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:234)
        at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:218)
        at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
        at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
        at org.apache.beam.examples.MyMinimalWordCount.main(MyMinimalWordCount.java:69)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
        at java.lang.Thread.run(Thread.java:748)

I可以使用Beam的MinimalWordCount示例的简单修改版本来重现它。

I can reproduce it with a simple modified version of the MinimalWordCount example from Beam.

public class MyMinimalWordCount {

    public interface WordCountOptions extends PipelineOptions {
        @Description("Path of the file to read from")
        ValueProvider<String> getInputFile();

        void setInputFile(ValueProvider<String> valueProvider);
    }

    public static void main(String[] args) {

        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);

        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.read().from(options.getInputFile()))

                .apply(FlatMapElements
                        .into(TypeDescriptors.strings())
                        .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
                .apply(Filter.by((String word) -> !word.isEmpty()))
                .apply(Count.perElement())
                .apply(MapElements
                        .into(TypeDescriptors.strings())
                        .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
                .apply(TextIO.write().to("wordcounts"));

        // Having the waitUntilFinish causes a NPE when trying to create a dataflow template
        //p.run().waitUntilFinish();

        p.run();
    }
}

我可以使用以下命令在本地运行示例:

I can run the example locally with:

mvn compile exec:java \
     -Pdirect-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--inputFile=pom.xml " 

它也可以在Google Dataflow上运行:

It also runs on Google Dataflow with:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --inputFile=gs://[bucket]/input.csv "

但是当我尝试使用以下内容创建Google Dataflow模板时,出现错误:

But when I try to create a Google Dataflow template with the following, I get the error:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --stagingLocation=gs://[bucket]/staging \
                  --templateLocation=gs://[bucket]/templates/MyMinimalWordCountTemplate " 

另一个令人困惑的事情是,maven的构建继续并以BUILD SUCCESS

The other confusing thing is that the maven build continues and ends with BUILD SUCCESS

所以我的问题是:

第一季度)我应该能够创建一个这样的Google Dataflow模板(使用ValueProviders在运行时提供TextIO输入)?

Q1) Should I be able to create a Google Dataflow template like this (using ValueProviders to provide TextIO input at runtime)?

第二季度)

Q2) Is the exception during the build a real error or just a WARNING as the logging seems to indicate?

Q3)如果对第1季度和第2季度的回答为是,并且只是警告,并且我尝试从上传的模板创建作业,为什么它没有任何元数据或不知道我的输入选项?

Q3) If the answers to Q1 and Q2 are yes and 'just a warning' and I try to create a job from the uploaded template why does it not have any metadata or know about my input options?

我使用的参考文献:

  • https://cloud.google.com/dataflow/docs/templates/creating-templates
  • https://beam.apache.org/get-started/quickstart-java/
  • https://beam.apache.org/documentation/runners/dataflow/#setup

推荐答案

正确的答案是,您在制作模板时不必输入任何内容,应该在运行时将输入作为值。唯一的例外是Google数据流的一个内部问题,以后应将其删除。

The Correct answer is that you do not have to give an input in making the template and it should take the input as a value at the run-time. The exception is an internal issue at the Google Data-flow which should be removed in future.

这篇关于使用TextIO和ValueProvider创建数据流模板时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆