连接 spark 结构化流 + kafka 时出错 [英] Error when connecting spark structured streaming + kafka
问题描述
我正在尝试将我的结构化流式 Spark 2.4.5 与 kafka 连接起来,但我一直在尝试此数据源提供程序错误.按照我的 Scala 代码和我的 sbt 构建:
im trying to connect my structured streaming spark 2.4.5 with kafka, but all the times that im trying this Data Source Provider errors appears. Follow my scala code and my sbt build:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object streaming_app_demo {
def main(args: Array[String]): Unit = {
println("Spark Structured Streaming with Kafka Demo Application Started ...")
val KAFKA_TOPIC_NAME_CONS = "test"
val KAFKA_OUTPUT_TOPIC_NAME_CONS = "test"
val KAFKA_BOOTSTRAP_SERVERS_CONS = "localhost:9092"
val spark = SparkSession.builder
.master("local[*]")
.appName("Spark Structured Streaming with Kafka Demo")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// Stream from Kafka
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS)
.option("subscribe", KAFKA_TOPIC_NAME_CONS)
.option("startingOffsets", "latest")
.load()
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test2")
.start()
}
}
错误是:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at streaming_app_demo$.main(teste.scala:29)
at streaming_app_demo.main(teste.scala)
我的 sbt.build 是:
And my sbt.build is:
name := "scala_212"
version := "0.1"
scalaVersion := "2.12.11"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" % "provided"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0"
谢谢!
推荐答案
对于 spark 结构化流 + kafka
,需要这个 spark-sql-kafka-0-10 库.
For spark structured streaming + kafka
, this spark-sql-kafka-0-10 library required.
您收到此 org.apache.spark.sql.AnalysisException: Failed to find data source: kafka
异常,因为 spark-sql-kafka
库在你的类路径 &在 META-INF/services 文件夹中找不到 org.apache.spark.sql.sources.DataSourceRegister
.
You are getting this org.apache.spark.sql.AnalysisException: Failed to find data source: kafka
exception because spark-sql-kafka
library is not available in your classpath & It is unable to find org.apache.spark.sql.sources.DataSourceRegister
inside META-INF/services folder.
jar 文件中的DataSourceRegister 路径
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
更新
如果您使用的是 SBT,请尝试添加以下代码块.这将在最终 jar 中包含 org.apache.spark.sql.sources.DataSourceRegister
文件.
If you are using SBT, try add below code block. This will include org.apache.spark.sql.sources.DataSourceRegister
file in your final jar.
// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
case PathList("META-INF",xs @ _*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
这篇关于连接 spark 结构化流 + kafka 时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!