连接 spark 结构化流 + kafka 时出错 [英] Error when connecting spark structured streaming + kafka

查看:58
本文介绍了连接 spark 结构化流 + kafka 时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将我的结构化流式 Spark 2.4.5 与 kafka 连接起来,但我一直在尝试此数据源提供程序错误.按照我的 Scala 代码和我的 sbt 构建:

im trying to connect my structured streaming spark 2.4.5 with kafka, but all the times that im trying this Data Source Provider errors appears. Follow my scala code and my sbt build:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object streaming_app_demo {
  def main(args: Array[String]): Unit = {

    println("Spark Structured Streaming with Kafka Demo Application Started ...")

    val KAFKA_TOPIC_NAME_CONS = "test"
    val KAFKA_OUTPUT_TOPIC_NAME_CONS = "test"
    val KAFKA_BOOTSTRAP_SERVERS_CONS = "localhost:9092"


    val spark = SparkSession.builder
      .master("local[*]")
      .appName("Spark Structured Streaming with Kafka Demo")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // Stream from Kafka
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS)
      .option("subscribe", KAFKA_TOPIC_NAME_CONS)
      .option("startingOffsets", "latest")
      .load()

    val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "test2")
      .start()
  }
}

错误是:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at streaming_app_demo$.main(teste.scala:29)
    at streaming_app_demo.main(teste.scala)

我的 sbt.build 是:

And my sbt.build is:

name := "scala_212"

version := "0.1"

scalaVersion := "2.12.11"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" % "provided"

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0"

谢谢!

推荐答案

对于 spark 结构化流 + kafka,需要这个 spark-sql-kafka-0-10 库.

For spark structured streaming + kafka, this spark-sql-kafka-0-10 library required.

您收到此 org.apache.spark.sql.AnalysisException: Failed to find data source: kafka 异常,因为 spark-sql-kafka 库在你的类路径 &在 META-INF/services 文件夹中找不到 org.apache.spark.sql.sources.DataSourceRegister.

You are getting this org.apache.spark.sql.AnalysisException: Failed to find data source: kafka exception because spark-sql-kafka library is not available in your classpath & It is unable to find org.apache.spark.sql.sources.DataSourceRegister inside META-INF/services folder.

jar 文件中的DataSourceRegister 路径

/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

更新

如果您使用的是 SBT,请尝试添加以下代码块.这将在最终 jar 中包含 org.apache.spark.sql.sources.DataSourceRegister 文件.

If you are using SBT, try add below code block. This will include org.apache.spark.sql.sources.DataSourceRegister file in your final jar.

// META-INF discarding
assemblyMergeStrategy in assembly := {
  case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
  case PathList("META-INF",xs @ _*) => MergeStrategy.discard
  case "application.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

这篇关于连接 spark 结构化流 + kafka 时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆