为什么格式(“kafka")失败并显示“无法找到数据源:kafka".(即使是超级罐子)? [英] Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)?

查看:61
本文介绍了为什么格式(“kafka")失败并显示“无法找到数据源:kafka".(即使是超级罐子)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 HDP-2.6.3.0 和 Spark2 包 2.2.0.

我正在尝试使用 Structured Streaming API 编写 Kafka 使用者,但在将作业提交到集群后出现以下错误:

线程main"中的异常java.lang.ClassNotFoundException:无法找到数据源:kafka.请在 http://spark.apache.org/third-party-projects.html 找到软件包在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)在 org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)在 org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)在 org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)在 org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)在 org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)在 org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)在 com.example.KafkaConsumer.main(KafkaConsumer.java:21)在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:498)在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引起:java.lang.ClassNotFoundException:kafka.DefaultSource在 java.net.URLClassLoader.findClass(URLClassLoader.java:381)在 java.lang.ClassLoader.loadClass(ClassLoader.java:424)在 java.lang.ClassLoader.loadClass(ClassLoader.java:357)在 org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)在 org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)在 scala.util.Try$.apply(Try.scala:192)在 org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)在 org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)在 scala.util.Try.orElse(Try.scala:84)在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)……还有 17 个

遵循 spark-submit 命令:

$SPARK_HOME/bin/spark-submit \--主纱\--deploy-mode 客户端\--class com.example.KafkaConsumer \--executor-cores 2 \--executor-memory 512m \--driver-memory 512m \sample-kafka-consumer-0.0.1-SNAPSHOT.jar

我的java代码:

package com.example;导入 org.apache.spark.sql.Dataset;导入 org.apache.spark.sql.Row;导入 org.apache.spark.sql.SparkSession;公共类 KafkaConsumer {公共静态无效主(字符串 [] args){SparkSession 火花 = SparkSession.builder().appName("kafkaConsumerApp").getOrCreate();数据集<行>ds = 火花.readStream().format("卡夫卡").option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667").option("订阅", "我的主题").加载();}}

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>sample-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><包装>罐</包装><依赖项><!-- 火花--><依赖><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.0</version></依赖><依赖><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.0</version></依赖><依赖><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>2.2.0</version></依赖><!-- 卡夫卡--><依赖><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.1.0</version></依赖></依赖项><存储库><存储库><id>local-maven-repo</id><url>file:///${project.basedir}/local-maven-repo</url></repository></repositories><构建><!-- 在 .jar 中包含资源文件夹 --><资源><资源><目录>${basedir}/src/main/resources</directory></资源></资源><插件><!-- 编译源代码的插件.--><插件><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><配置><来源>1.8</来源><目标>1.8</目标></配置></插件><!-- 插件以包含 .jar 中的所有依赖项并设置主类.--><插件><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><执行><执行><phase>包</phase><目标><目标>阴影</目标></目标><配置><过滤器><!-- 此过滤器用于解决由包含签名的 jar 引起的问题.java.lang.SecurityException:Manifest 主要属性的无效签名文件摘要--><过滤器><神器>*:*</神器><排除><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></排除></过滤器></过滤器><变形金刚><变压器implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.KafkaConsumer</mainClass></变压器></变形金刚></配置></执行></执行></插件></plugins></build></项目>

[更新] UBER-JAR

在pom.xml中用于生成uber-jar的配置下方

 <groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><执行><执行><phase>包</phase><目标><目标>阴影</目标></目标><配置><过滤器><!-- 此过滤器用于解决由包含签名的 jar 引起的问题.java.lang.SecurityException:Manifest 主要属性的无效签名文件摘要--><过滤器><神器>*:*</神器><排除><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></排除></过滤器></过滤器><变形金刚><变压器implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.KafkaConsumer</mainClass></变压器></变形金刚></配置></执行></执行></插件>

解决方案

kafka 数据源是一个 external 模块,默认情况下不适用于 Spark 应用程序.

您必须在 pom.xml 中将其定义为依赖项(正如您所做的那样),但这只是在 Spark 应用程序中使用它的第一步.

 <依赖><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>2.2.0</version></依赖>

有了这个依赖项,你必须决定是否要创建一个所谓的 uber-jar 将所有依赖项捆绑在一起(这会产生一个相当大的 jar 文件并提交时间更长)或使用 --packages(或不太灵活的 --jars)选项在 spark-submit 时间添加依赖项.>

(还有其他选项,例如将所需的 jars 存储在 Hadoop HDFS 上或使用特定于 Hadoop 发行版的方式来定义 Spark 应用程序的依赖项,但让我们保持简单)

我建议首先使用 --packages 并且只有在它有效时才考虑其他选项.

使用 spark-submit --packages 包含 spark-sql-kafka-0-10 模块,如下所示.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

根据需要包括其他命令行选项.

Uber-Jar 方法

由于 META-INF 目录的处理方式,在所谓的 uber-jar 中包含所有依赖项可能并不总是有效.

要使 kafka 数据源(以及一般的其他数据源)正常工作,您必须确保 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister 的所有数据源都合并(不是replacefirst 或您使用的任何策略).

kafka 数据源使用自己的 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister 注册org.apache.spark.sql.kafka010.KafkaSourceProvider 作为 kafka 格式的数据源提供者.

I use HDP-2.6.3.0 with Spark2 package 2.2.0.

I'm trying to write a Kafka consumer, using the Structured Streaming API, but I'm getting the following error after submit the job to the cluster:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
... 17 more

Following spark-submit command:

$SPARK_HOME/bin/spark-submit \
     ​--master yarn \
​     --deploy-mode client \
​​     --class com.example.KafkaConsumer \​
​     --executor-cores 2 \
​​     --executor-memory 512m \​           
     --driver-memory 512m \​           
     sample-kafka-consumer-0.0.1-SNAPSHOT.jar​

My java code:

package com.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class KafkaConsumer {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                  .builder()
                  .appName("kafkaConsumerApp")
                  .getOrCreate();

        Dataset<Row> ds = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667")
                  .option("subscribe", "my-topic")
                  .load();
    }
}

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>sample-kafka-consumer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

    <dependencies>

        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>


    </dependencies>  


    <repositories>
        <repository>
            <id>local-maven-repo</id>
            <url>file:///${project.basedir}/local-maven-repo</url>
        </repository>
    </repositories> 

    <build>

        <!-- Include resources folder in the .jar -->
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>

        <plugins>

            <!-- Plugin to compile the source. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>       

            <!-- Plugin to include all the dependencies in the .jar and set the main class. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <!-- This filter is to workaround the problem caused by included signed jars.
                                     java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
                                -->
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.KafkaConsumer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>    
</project>

[UPDATE] UBER-JAR

Below the configuration used in the pom.xml to generate the uber-jar

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <!-- This filter is to workaround the problem caused by included signed jars.
                                     java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
                                -->
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.KafkaConsumer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

解决方案

kafka data source is an external module and is not available to Spark applications by default.

You have to define it as a dependency in your pom.xml (as you have done), but that's just the very first step to have it in your Spark application.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

With that dependency you have to decide whether you want to create a so-called uber-jar that would have all the dependencies bundled altogether (that results in a fairly big jar file and makes the submission time longer) or use --packages (or less flexible --jars) option to add the dependency at spark-submit time.

(There are other options like storing the required jars on Hadoop HDFS or using Hadoop distribution-specific ways of defining dependencies for Spark applications, but let's keep things simple)

I'd recommend using --packages first and only when it works consider the other options.

Use spark-submit --packages to include the spark-sql-kafka-0-10 module as follows.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Include the other command-line options as you wish.

Uber-Jar Approach

Including all the dependencies in a so-called uber-jar may not always work due to how META-INF directories are handled.

For kafka data source to work (and other data sources in general) you have to ensure that META-INF/services/org.apache.spark.sql.sources.DataSourceRegister of all the data sources are merged (not replace or first or whatever strategy you use).

kafka data sources uses its own META-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider as the data source provider for kafka format.

这篇关于为什么格式(“kafka")失败并显示“无法找到数据源:kafka".(即使是超级罐子)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆