Google数据流上的Apache Beam IllegalArgumentException,显示消息“不希望有可拆分的ParDoSingle:应该已被覆盖" [英] Apache Beam IllegalArgumentException on Google Dataflow with message `Not expecting a splittable ParDoSingle: should have been overridden`

查看:69
本文介绍了Google数据流上的Apache Beam IllegalArgumentException,显示消息“不希望有可拆分的ParDoSingle:应该已被覆盖"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一条管道,该管道会定期检查Google存储桶中是否有新的.gz文件,这些文件实际上是已压缩的.csv文件.然后,将这些记录写入BigQuery表.在添加.watchForNewFiles(...).withMethod(STREAMING_INSERTS)部分之前,以下代码在批处理模式下工作.我希望它能够以这些更改以流模式运行.但是,我遇到了一个例外,即我在网络上找不到任何相关内容.这是我的代码:

I am trying to write a pipeline which periodically checks a Google Storage bucket for new .gz files which are actually compressed .csv files. Then it writes those records to a BigQuery table. The following code was working in batch mode before I added the .watchForNewFiles(...) and .withMethod(STREAMING_INSERTS) parts. I am expecting it to run in streaming mode with those changes. However I am getting an exception that I can't find anything related on the web. Here is my code:

public static void main(String[] args) {       

    DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
            //.withValidation()
            .as(DataflowDfpOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    Stopwatch sw = Stopwatch.createStarted();
    log.info("DFP data transfer from GS to BQ has started.");

    pipeline.apply("ReadFromStorage", TextIO.read()
            .from("gs://my-bucket/my-folder/*.gz")
            .withCompression(Compression.GZIP)
            .watchForNewFiles(
                    // Check for new files every 30 seconds
                    Duration.standardSeconds(30),
                    // Never stop checking for new files
                    Watch.Growth.never()
            )
    )
            .apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
            .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                    .to(options.getTableId())
                    .withMethod(STREAMING_INSERTS)
                    .withCreateDisposition(CREATE_NEVER)
                    .withWriteDisposition(WRITE_APPEND)
                    .withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead

    pipeline.run().waitUntilFinish();

    log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}

/**
 * Creates a TableRow from a CSV line
 */
private static class TableRowConverterFn extends DoFn<String, TableRow> {

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        String[] split = c.element().split(",");

        //Ignore the header line
        //Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
        if (split[0].equals("Time")) {
            log.info("Skipped header");
            return;
        }

        TableRow row = new TableRow();
        for (int i = 0; i < split.length; i++) {
            TableFieldSchema col = TableSchema.getFields().get(i);

            //String is the most common type, putting it in the first if clause for a little bit optimization.
            if (col.getType().equals("STRING")) {
                row.set(col.getName(), split[i]);
            } else if (col.getType().equals("INTEGER")) {
                row.set(col.getName(), Long.valueOf(split[i]));
            } else if (col.getType().equals("BOOLEAN")) {
                row.set(col.getName(), Boolean.valueOf(split[i]));
            } else if (col.getType().equals("FLOAT")) {
                row.set(col.getName(), Float.valueOf(split[i]));
            } else {
                //Simply try to write it as a String if
                //todo: Consider other BQ data types.
                row.set(col.getName(), split[i]);
            }
        }
        c.output(row);
    }
}

以及堆栈跟踪:

java.lang.IllegalArgumentException: Not expecting a splittable ParDoSingle: should have been overridden
    at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:167)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:145)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:86)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:165)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:684)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.diply.data.App.main(App.java:66)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

这是我的命令,用于在Dataflow上发布作业:

Here is my command to publish the job on Dataflow:

clean compile exec:java -Dexec.mainClass=com.my.project.App "-Dexec.args=--runner=DataflowRunner --tempLocation=gs://my-bucket/tmp --tableId=Temp.TestTable --project=my-project --jobName=dataflow-dfp-streaming" -Pdataflow-runner

我使用的是Apache梁版本2.5.0.这是我的pom.xml中的相关部分.

I use apache beam version 2.5.0. Here is the relevant section from my pom.xml.

 <properties>
   <beam.version>2.5.0</beam.version>
   <bigquery.version>v2-rev374-1.23.0</bigquery.version>
   <google-clients.version>1.23.0</google-clients.version>
   ...
 </properties>

推荐答案

在Dataflow 2.4.0中运行代码会给出一个更明确的错误:java.lang.UnsupportedOperationException: DataflowRunner does not currently support splittable DoFn

Running the code with Dataflow 2.4.0 gives a more explicit error: java.lang.UnsupportedOperationException: DataflowRunner does not currently support splittable DoFn

但是,此答案表明,此版本自2.2.0开始受支持.确实是这种情况,并遵循

However, this answer suggests that this is supported since 2.2.0. This is indeed the case, and following this remark you need to add the --streaming option in your Dexec.args to force it into streaming mode.

我用我在评论中提供的代码和两个我的,并且两者都1.在没有--streaming的情况下产生错误2.在--streaming

I tested it with the code I supplied in the comments with both your pom and mine and both 1. produce your error without --streaming 2. run fine with --streaming

您可能要打开github Beam问题,因为据我所知,官方没有在任何地方对此行为进行记录.

You might want to open a github beam issue since this behavior is not documented anywhere offically as far as I know.

这篇关于Google数据流上的Apache Beam IllegalArgumentException,显示消息“不希望有可拆分的ParDoSingle:应该已被覆盖"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆