Apache Beam Java SDK SparkRunner写入拼花错误 [英] Apache Beam Java SDK SparkRunner write to parquet error

查看:124
本文介绍了Apache Beam Java SDK SparkRunner写入拼花错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将Apache Beam与Java结合使用. 我正在尝试使用本地模式在预先部署的Spark env上使用SparkRunner读取csv文件并将其写入拼花格式. DirectRunner一切正常,但是SparkRunner无法正常工作. 我正在使用Maven Shade插件构建胖子.

I'm using Apache Beam with Java. I'm trying to read a csv file and write it to parquet format using the SparkRunner on a predeployed Spark env, using local mode. Everything worked fine with the DirectRunner, but the SparkRunner simply wont work. I'm using maven shade plugin to build a fat jat.

代码如下:

Java:

public class ImportCSVToParquet{
-- ommitted
                File csv = new File(filePath);
                PCollection<String> vals = pipeline.apply(TextIO.read().from(filePath));

                String parquetFilename = csv.getName().replaceFirst("csv", "parquet");
                String outputLocation = FolderConventions.getRawFilePath(confETL.getHdfsRoot(), parquetFilename);

                PCollection<GenericRecord> processed = vals.apply(ParDo.of(new ProcessFiles.GenericRecordFromCsvFn()))
                        .setCoder(AvroCoder.of(new Config().getTransactionSchema()));

                LOG.info("Processed file will be written to: " + outputLocation);
                processed.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(conf.getTransactionSchema())).to(outputLocation));


        pipeline.run().waitUntilFinish();


}

POM依赖项:

<dependencies>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-spark</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-parquet</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
/dependencies>

火花脚本:

spark-submit \
--class package.ImportCSVToParquet \
--master local[*] \
--executor-cores 2 \
--executor-memory 2g \
--driver-memory 2g \
--driver-cores 2 \
--conf spark.sql.codegen.wholeStage=false \
--conf spark.wholeStage.codegen=false \
--conf spark.sql.shuffle.partitions=2005 \
--conf spark.driver.maxResultSize=2g \
--conf spark.executor.memoryOverhead=4048 \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" \
--conf "spark.driver.extraJavaOptions=-Djava.io.tmpdir=/path-to-tmp/" \
--conf "spark.driver.extraClassPath=./" \
--jars path-to-jar \
/path-to-jar "$@"

我收到以下错误:

2019-08-07 13:37:49 ERROR Executor:91 - Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
       at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:176)
        at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
        at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
        at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.open(ParquetIO.java:304)
        at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.prepareWrite(FileIO.java:1359)
        at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:937)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn.processElement(WriteFiles.java:533)

似乎该作业可以进行读取和转换,但是在尝试写入文件系统时会失败.我目前不使用HDFS.有什么想法吗?

It seems that the job does the reading and transformations, but fails when tries to write to the filesystem. I'm not using HDFS at the moment. Any ideas?

推荐答案

我确定ParquetIO依赖于Parquet 1.10+版本,该版本向Parquet文件读取器/写入器添加了不带Hadoop的" API.

I am certain that the ParquetIO depends on the Parquet 1.10+ release, which added a "hadoop-neutral" API to the parquet file readers/writers.

Spark 2.2.3 取决于 Parquet 1.8.2,它没有Beam ParquetIO使用的builder(...)构造函数,该异常已得到确认.

Spark 2.2.3 depends on Parquet 1.8.2, which does not have the builder(...) constructor that the Beam ParquetIO uses, which is confirmed by the exception.

如果可能的话,最简单的解决方案是将Spark升级到Spark 2.4,使Parquet版本升至1.10.0.

If possible, the simplest solution would be to update to Spark 2.4 which has bumped the Parquet version to 1.10.0.

如果您无法升级Spark版本,则有两种技术可以覆盖Spark带来的jar:

If you can't upgrade Spark versions, there are a couple of techniques for overriding the jars brought in by Spark:

  1. 您可以将spark.(driver|executor).userClassPathFirst设置为true,这会将类放在您的胖罐中,然后由spark提供.这可能行得通,或者可能引入新的依赖冲突.

  1. You can set spark.(driver|executor).userClassPathFirst to true, which will place the classes in your fat jar before the jars provided by spark. This might work, or it might introduce new dependency conflicts.

您可以尝试将本地Spark安装中的parquet-xx-1.8.2.jar替换为parquet-xx-1.10.0(假设它们是直接替代品).如果可行,则可以通过在提交作业时设置spark.yarn.jars属性,将相同的策略应用于群集中的Spark作业.

You can try replacing the parquet-xx-1.8.2.jar in your local spark installation with parquet-xx-1.10.0 (assuming that they are drop-in replacements). If this works, you can apply the same strategy to a spark job in a cluster by setting the spark.yarn.jars property when submitting the job.

您可以在胖罐中尝试遮蔽光束ParquetIO及其对木地板的依赖性.

You can try shading the beam ParquetIO and its parquet dependencies in your fat jar.

编辑:这是一个已知问题, BEAM-5164 .

Edit: This is a known issue BEAM-5164.

编辑(解决方法):

通过遵循说明,我设法使它适用于Spark 2.2.3 a>进行了一些修改:

I managed to get this to work for Spark 2.2.3 by following the instructions with some modifications:

  • 我使用了scala 2.11依赖项并将其设置为<scope>provided</scope>(可能是可选的).

我在maven-shade-plugin中添加了以下三个位置:

I added the following three locations to the maven-shade-plugin:

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <configuration>
          <createDependencyReducedPom>false</createDependencyReducedPom>
          <filters>

... unchanged ...

          </filters>
          <relocations>
            <relocation>
              <pattern>org.apache.parquet</pattern>
              <shadedPattern>shaded.org.apache.parquet</shadedPattern>
            </relocation>
            <!-- Some packages are shaded already, and on the original spark classpath. Shade them more. -->
            <relocation>
              <pattern>shaded.parquet</pattern>
              <shadedPattern>reshaded.parquet</shadedPattern>
            </relocation>
            <relocation>
              <pattern>org.apache.avro</pattern>
              <shadedPattern>shaded.org.apache.avro</shadedPattern>
            </relocation>
          </relocations>
        </configuration>
        <executions>

... unchanged ...

        </executions>
      </plugin>
    </plugins>
  </build>

这篇关于Apache Beam Java SDK SparkRunner写入拼花错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆