在Spark中启用目录提交器 [英] Enabling the Directory Committer in Spark

查看:58
本文介绍了在Spark中启用目录提交器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将S3A分区(或目录,因为我只需要确认提交者是否按预期工作)与Spark结合使用.我正在关注链接应该基于它很简单,但是在解决上一个问题时我遇到了新问题

I am trying to use S3A Partitioned(or Directory as I just need to confirm if committer is working as expected) committer with Spark. I am following this link based on which it should be pretty simple however I am running into new issues while resolving previous one

用于测试的代码是(在内部):

Code used for testing is (inside spark-shell):

val sourceDF = spark.range(0, 10000)
val datasets = "s3a://bucket-name/test"
sourceDF.write.format("orc").save(datasets + "orc")

spark-defaults.conf 是:

spark.hadoop.fs.s3a.committer.name directory

spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter


Error 1:
scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoClassDefFoundError: 
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org .apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 81 more

然后我从

Then I copied spark-hadoop-cloud_2.11-2.3.1.3.0.2.0-50.jar from this link into spark/jars folder

这解决了先前的``NoClassDefFoundError`,但产生了新的类定义错误,

This resolved the previous ``NoClassDefFoundError` but produced new class def error which is:

错误2:

java.lang.NoClassDefFoundError: 
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
....

如果需要,可以粘贴完整的堆栈跟踪

Full stacktrace can be pasted if needed

此后,我将hadoop-mapreduce-client-core-3.1.1.jar复制到spark/jars文件夹,然后再次在spark-shell中运行测试代码.这次我遇到了以下错误:

After this, I copied hadoop-mapreduce-client-core-3.1.1.jar into spark/jars folder and ran the test code in spark-shell again. This time I got below error:

此后,我被困住了.

错误3(还有我遇到的最后一个错误):

Error 3 (and final error where I am stuck):

scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoSuchMethodError: 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.<init>(Ljava/lang/String;Ljava/lang/String;Z)V
at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.<init>(PathOutputCommitProtocol.scala:60)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
... 48 elided

这看起来像是不正确的jar问题,但是我找不到正确的问题.这个问题类似于

This looks like incorrect jar issue but I am not able to find the correct one. This question is similar to previous question but could not find relevant answer hence posting again.

推荐答案

我到目前为止阅读的所有新提交者配置文档都缺少一个基本事实:

All the new committers config documentation I've read up to date, is missing one fundamental fact:

他们承诺,那些云集成库将与spark 3.0.0捆绑在一起,但是现在您必须自己添加库.

They promise those cloud integration libs will be bundled with spark 3.0.0, but for now you have to add libraries yourself.

云集成 maven仓库有多个支持提交者的发行版,我发现

Under the cloud integration maven repos there are multiple distributions supporting the committers, I found one working with the directory committer

还请记住,目录提交者需要共享文件系统(例如HDFS或NFS(我们使用AWS EFS))来协调将火花工作程序写入S3.

Please also remember that the directory committer requires shared filesystem such as HDFS or NFS (we use AWS EFS) to coordinate spark worker writes to S3.

如果未正确设置,则生成的S3写入位置将包含其中包含单个_SUCCESS文件的空文件夹.

If it is not set properly, the resulting S3 write location will contain empty folder with single _SUCCESS file in it.

如果正确设置了提交者,则_SUCCESS文件将包含写入的JSON状态.如果为空(长度为零),则表示您仍使用默认的spark s3提交程序.

If you set your committer properly, the _SUCCESS file will contain the JSON status of the write. If it is empty (zero length), means you still use the default spark s3 committer.

这篇关于在Spark中启用目录提交器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆