如何使用带有火花壳的三角洲湖? [英] How to use Delta Lake with spark-shell?
问题描述
我正在尝试将Spark DF编写为DeltaTable. 它在我的IDE Intelliji中运行正常,但是具有相同的依赖项和版本,在我的spark REPL(Spark shell)中不起作用
I'm trying to write as Spark DF as a DeltaTable. It's working fine in my IDE Intelliji , But with the same dependencies and versions it's not working in my spark REPL(Spark shell)
火花版本:2.4.0 Scala版本:2.11.8
Spark Version :2.4.0 Scala Version :2.11.8
Intelliji中的依赖项(整个项目的依赖项,请忽略相关内容)
Dependencies in Intelliji (Dependencies for whole project , Kindly ignore relevant)
compile 'org.scala-lang:scala-library:2.11.8'
compile 'org.scala-lang:scala-reflect:2.11.8'
compile 'org.scala-lang:scala-compiler:2.11.8'
compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.1.2'
compile 'org.scala-lang.modules:scala-swing_2.11:2.0.3'
compile 'org.apache.spark:spark-mllib_2.11:2.4.0'
compile 'org.apache.spark:spark-sql_2.11:2.4.0'
compile 'org.apache.spark:spark-graphx_2.11:2.4.0'
compile 'org.apache.spark:spark-launcher_2.11:2.4.0'
compile 'org.apache.spark:spark-catalyst_2.11:2.4.0'
compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
compile group: 'io.delta', name: 'delta-core_2.11', version: '0.5.0'
compile 'org.apache.spark:spark-core_2.11:2.4.0'
compile 'org.apache.spark:spark-hive_2.11:2.4.0'
compile 'com.databricks:spark-avro_2.11:4.0.0'
compile 'org.apache.avro:avro-mapred:1.8.2'
compile 'org.apache.avro:avro:1.8.2'
compile 'org.apache.avro:avro-compiler:1.8.2'
compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.15'
compile group: 'commons-io', name: 'commons-io', version: '2.5'
testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.26'
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.scalatest', name: 'scalatest_2.12', version: '3.2.0-SNAP10'
compile group: 'javax.mail', name: 'javax.mail-api', version: '1.6.2'
compile group: 'com.sun.mail' ,name: 'javax.mail', version: '1.6.0'
compile 'com.hortonworks:shc-core:1.1.1-2.1-s_2.11'
compile 'com.hortonworks:shc:1.1.1-2.1-s_2.11'
compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-common', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase', version: '1.2.5', ext: 'pom'
compile group: 'org.apache.hbase', name: 'hbase-protocol', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-hadoop2-compat', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-annotations', version: '1.2.5'
// jackson modues
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.6'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'
compile group: 'org.codehaus.jackson', name: 'jackson-core-asl', version: '1.9.13'
compile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.8.7'
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.8.6'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.8.6'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.8.6'
compile group: 'org.json4s', name: 'json4s-jackson_2.11', version: '3.2.10'
compile group: 'com.twitter', name: 'parquet-jackson', version: '1.6.0'
compile group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.9.13'
compile group: 'org.codehaus.jackson', name: 'jackson-xc', version: '1.9.13'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-paranamer', version: '2.8.6'
compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-annotations', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-auth', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-yarn-common', version: '2.7.3'
我要执行的代码片段
import io.delta._
val dF=spark.read.load("path") //parquet file
dF.write.format("delta").mode("overwrite").partitionBy("topic","partition","key").save("path") // delta table
使用的spark-shell命令:
spark-shell Command used:
spark-shell --packages com.fasterxml.jackson.core:jackson-databind:2.8.6,com.fasterxml.jackson.core:jackson-core:2.10.0,org.codehaus.jackson:jackson-core-asl:1.9.13,org.codehaus.jackson:jackson-mapper-asl:1.9.13,com.fasterxml.jackson.core:jackson-annotations:2.8.7,com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.6,com.fasterxml.jackson.module:jackson-module-scala_2.11:2.8.6,com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.8.6,com.twitter:parquet-jackson:1.6.0,org.codehaus.jackson:jackson-jaxrs:1.9.13,org.codehaus.jackson:jackson-xc:1.9.13,com.fasterxml.jackson.module:jackson-module-paranamer:2.8.6,io.delta:delta-core_2.11:0.5.0,commons-io:commons-io:2.5
REPL中的错误:
Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z
at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:127)
at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:202)
at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:201)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.delta.actions.Metadata.schema$lzycompute(actions.scala:201)
at org.apache.spark.sql.delta.actions.Metadata.schema(actions.scala:200)
at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:61)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:45)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:85)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:65)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:396)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:133)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at org.controller.deltaLakeEG.deltaLakeHadoopEg$.main(deltaLakeHadoopEg.scala:29)
at org.controller.deltaLakeEG.deltaLakeHadoopEg.main(deltaLakeHadoopEg.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
推荐答案
按照
Delta Lake需要Apache Spark 2.4.2或更高版本
Delta Lake requires Apache Spark version 2.4.2 or above 请在IntelliJ IDEA中将您的Spark版本升级到至少2.4.2(或出现问题).撰写本文时,最新版本为2.4.4. Please upgrade your Spark version to at least 2.4.2 in IntelliJ IDEA (or issues show up). The latest version as of this writing is 2.4.4. 根据官方文档: 使用Delta Lake软件包运行spark-shell: Run spark-shell with the Delta Lake package: 从我本人使用 From myself, use 要在Delta Lake 0.5.0上运行 The entire command to run 这篇关于如何使用带有火花壳的三角洲湖?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
启用Delta Lake的SQL命令,例如DESCRIBE DETAIL
,GENERATE
.--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
to enable Delta Lake's SQL commands, e.g. DESCRIBE DETAIL
, GENERATE
.spark-shell
的整个命令应如下:spark-shell
with Delta Lake 0.5.0 should be as follows:./bin/spark-shell \
--packages io.delta:delta-core_2.11:0.5.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension