PySpark 结构化流 + Kafka 错误(由:java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport 引起) [英] PySpark structured Streaming + Kafka Error (Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport )

查看:39
本文介绍了PySpark 结构化流 + Kafka 错误(由:java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport 引起)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我运行命令时,我正在尝试运行 Python Spark Structured Streaming + Kafka

Master@MacBook-Pro spark-3.0.0-preview2-bin-hadoop2.7 % bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 \示例/src/main/python/sql/streaming/structured_kafka_wordcount.py \/Users/Master/Projects/bank_kafka_spark/spark_job1.py localhost:9092 交易

接收下一个

20/04/22 13:06:04 WARN Utils:您的主机名 MacBook-Pro.local 解析为环回地址:127.0.0.1;使用 192.168.0.103 代替(在接口 en0 上)20/04/22 13:06:04 WARN Utils:如果需要绑定到另一个地址,请设置 SPARK_LOCAL_IP警告:发生了非法的反射访问操作警告:org.apache.spark.unsafe.Platform 的非法反射访问(文件:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) 到构造函数 java.nio.DirectByteBuffer(long,int)警告:请考虑将此报告给 org.apache.spark.unsafe.Platform 的维护者警告:使用 --illegal-access=warn 启用进一步非法反射访问操作的警告警告:在未来的版本中将拒绝所有非法访问操作Ivy 默认缓存设置为:/Users/Master/.ivy2/cache存储在以下位置的包的 jars:/Users/Master/.ivy2/jars:: 加载设置 :: url = jar:file:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/核心/设置/ivysettings.xmlorg.apache.spark#spark-sql-kafka-0-10_2.12 作为依赖添加:: 解决依赖项 :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0;1.0confs: [默认]在中央找到 org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5在中央找到 org.apache.kafka#kafka-clients;2.0.0在中央找到 org.lz4#lz4-java;1.4.0在中央找到 org.xerial.snappy#snappy-java;1.1.7.3在中央找到 org.slf4j#slf4j-api;1.7.16在 local-m2-cache 中找到 org.spark-project.spark#unused;1.0.0:: 解析报告 :: 解析 315ms :: artifacts dl 6ms:: 正在使用的模块:org.apache.kafka#kafka-clients;2.0.0 来自 [默认]org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5 来自 [默认]org.lz4#lz4-java;1.4.0 来自 [默认]org.slf4j#slf4j-api;1.7.16 来自 [默认]org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]org.xerial.snappy#snappy-java;1.1.7.3 来自中央 [默认]---------------------------------------------------------------------||模块 ||文物||配置 |数量|搜索|dwnlded|驱逐||数量|减少|---------------------------------------------------------------------|默认 |6 |0 |0 |0 ||6 |0 |---------------------------------------------------------------------:: 检索 :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0confs: [默认]已复制 0 个工件,已检索 6 个 (0kB/6ms)20/04/22 13:06:04 DEBUG NativeCodeLoader:尝试加载自定义构建的 native-hadoop 库...20/04/22 13:06:04 DEBUG NativeCodeLoader:无法加载本机-hadoop 错误:java.lang.UnsatisfiedLinkError:java.library.path 中没有 hadoop:[/Users/Master/Library/Java/Extensions,/库/Java/扩展、/Network/Library/Java/Extensions、/System/Library/Java/Extensions、/usr/lib/java、.]20/04/22 13:06:04 DEBUG NativeCodeLoader: java.library.path=/Users/Master/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.20/04/22 13:06:04 警告 NativeCodeLoader:无法为您的平台加载本机 Hadoop 库...在适用的情况下使用内置 Java 类回溯(最近一次调用最后一次):文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py",第68行,在<module>.option(subscribeType, 主题)\文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py",第406行,加载中文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",第1286行,__call__文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",第 98 行,deco文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",第328行,get_return_valuepy4j.protocol.Py4JJavaError:调用 o31.load 时发生错误.: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport在 java.base/java.lang.ClassLoader.defineClass1(Native Method)在 java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)在 java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:151)在 java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:821)在 java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:719)在 java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:642)在 java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:600)在 java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:575)在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)在 java.base/java.lang.Class.forName0(Native Method)在 java.base/java.lang.Class.forName(Class.java:416)在 java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1210)在 java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)在 java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)在 java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)在 java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)在 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)在 scala.collection.Iterator.foreach(Ite​​rator.scala:941)在 scala.collection.Iterator.foreach$(Iterator.scala:941)在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1429)在 scala.collection.IterableLike.foreach(Ite​​rableLike.scala:74)在 scala.collection.IterableLike.foreach$(IterableLike.scala:73)在 scala.collection.AbstractIterable.foreach(Ite​​rable.scala:56)在 scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)在 scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)在 scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)在 scala.collection.TraversableLike.filter(TraversableLike.scala:347)在 scala.collection.TraversableLike.filter$(TraversableLike.scala:347)在 scala.collection.AbstractTraversable.filter(Traversable.scala:108)在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.base/java.lang.reflect.Method.invoke(Method.java:567)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在 py4j.Gateway.invoke(Gateway.java:282)在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79)在 py4j.GatewayConnection.run(GatewayConnection.java:238)在 java.base/java.lang.Thread.run(Thread.java:830)引起:java.lang.ClassNotFoundException:org.apache.spark.sql.sources.v2.StreamWriteSupport在 java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)在 java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)... 43 更多

我使用来自 PySpark examples/src/main/python/sql/streaming/structured_kafka_wordcount.py 的示例.

structured_kafka_wordcount.py.

<预><代码>## 根据一项或多项授权给 Apache 软件基金会 (ASF)# 贡献者许可协议.请参阅随附的 NOTICE 文件# 这项工作是为了获取有关版权所有权的其他信息.# ASF 根据 Apache 许可,版本 2.0 向您授予此文件的许可#(许可证");除非符合以下规定,否则您不得使用此文件# 许可证.您可以在以下网址获取许可证副本## http://www.apache.org/licenses/LICENSE-2.0## 除非适用法律要求或书面同意,否则软件# 在许可证下分发是按原样"分发的,# 没有任何形式的明示或暗示的保证或条件.# 请参阅许可证以了解管理权限的特定语言和# 许可证下的限制.#"""使用来自 Kafka 中一个或多个主题的消息并进行字数统计.用法:structured_kafka_wordcount.py <订阅类型><话题><引导服务器>Kafkabootstrap.servers"配置.一种逗号分隔的主机列表:端口.<订阅类型>共有三种类型,即'assign'、'subscribe'、'订阅模式'.|- <分配>要使用的特定主题分区.JSON字符串|{"topicA":[0,1],"topicB":[2,4]}.|- <订阅>要订阅的主题列表.逗号分隔的列表|话题.|- 用于订阅主题的模式.|Java 正则表达式字符串.|- 只能是assign"、subscribe"或subscribePattern"选项之一|为 Kafka 源指定.<话题>不同的值格式取决于订阅类型"的值.运行示例`$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \host1:port1,host2:port2 订阅 topic1,topic2`"""从 __future__ 导入 print_function导入系统从 pyspark.sql 导入 SparkSession从 pyspark.sql.functions 导入爆炸从 pyspark.sql.functions 导入拆分如果 __name__ == "__main__":如果 len(sys.argv) != 4:打印("""用法:structured_kafka_wordcount.py <订阅类型><话题>""", 文件=sys.stderr)sys.exit(-1)bootstrapServers = sys.argv[1]subscribeType = sys.argv[2]主题 = sys.argv[3]火花 = SparkSession\.builder\.appName("StructuredKafkaWordCount")\.getOrCreate()# 创建表示来自 kafka 的输入行流的数据集线条 = 火花\.readStream\.format("kafka")\.option("kafka.bootstrap.servers", bootstrapServers)\.option(subscribeType,topics)\ # 这里停止并返回错误.加载()\.selectExpr("CAST(value AS STRING)")# 将行拆分为单词字 = 行.选择(#explode 将数组中的每一项变成单独的行爆炸(拆分(行.值,'')).alias('word'))# 生成运行字数wordCounts = words.groupBy('word').count()# 开始运行将运行计数打印到控制台的查询查询 = wordCounts\.writeStream\.outputMode('完成')\.format('控制台')\.开始()query.awaitTermination()

Kafka 服务器正在运行,主题已创建.

Java 版本 13.0.2

Scala 2.13.1

卡夫卡 2.12-2.4.1

Spark spark-3.0.0-preview2-bin-hadoop2.7

有什么问题?

解决方案

我也遇到了完全相同的问题,直到我意识到我添加了错误的依赖项!

代替:--packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5

使用:--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-preview2

I am trying to run Python Spark Structured Streaming + Kafka, when I run the command

Master@MacBook-Pro spark-3.0.0-preview2-bin-hadoop2.7 % bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 \
examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \
/Users/Master/Projects/bank_kafka_spark/spark_job1.py localhost:9092 transaction

receiving next

20/04/22 13:06:04 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.103 instead (on interface en0)
20/04/22 13:06:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Ivy Default Cache set to: /Users/Master/.ivy2/cache
The jars for the packages stored in: /Users/Master/.ivy2/jars
:: loading settings :: url = jar:file:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5 in central
        found org.apache.kafka#kafka-clients;2.0.0 in central
        found org.lz4#lz4-java;1.4.0 in central
        found org.xerial.snappy#snappy-java;1.1.7.3 in central
        found org.slf4j#slf4j-api;1.7.16 in central
        found org.spark-project.spark#unused;1.0.0 in local-m2-cache
:: resolution report :: resolve 315ms :: artifacts dl 6ms
        :: modules in use:
        org.apache.kafka#kafka-clients;2.0.0 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5 from central in [default]
        org.lz4#lz4-java;1.4.0 from central in [default]
        org.slf4j#slf4j-api;1.7.16 from central in [default]
        org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]
        org.xerial.snappy#snappy-java;1.1.7.3 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0
        confs: [default]
        0 artifacts copied, 6 already retrieved (0kB/6ms)
20/04/22 13:06:04 DEBUG NativeCodeLoader: Trying to load the custom-built native-hadoop library...
20/04/22 13:06:04 DEBUG NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path: [/Users/Master/Library/Java/Extensions, /Library/Java/Extensions, /Network/Library/Java/Extensions, /System/Library/Java/Extensions, /usr/lib/java, .]
20/04/22 13:06:04 DEBUG NativeCodeLoader: java.library.path=/Users/Master/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
20/04/22 13:06:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py", line 68, in <module>
    .option(subscribeType, topics)\
  File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 406, in load
  File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 98, in deco
  File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
        at java.base/java.lang.ClassLoader.defineClass1(Native Method)
        at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
        at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:151)
        at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:821)
        at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:719)
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:642)
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:600)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:575)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:416)
        at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1210)
        at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)
        at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
        at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
        at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
        at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
        at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
        at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
        at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        ... 43 more


I use example from PySpark examples/src/main/python/sql/streaming/structured_kafka_wordcount.py.

structured_kafka_wordcount.py.

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
 Consumes messages from one or more topics in Kafka and does wordcount.
 Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
   <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A
   comma-separated list of host:port.
   <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe',
   'subscribePattern'.
   |- <assign> Specific TopicPartitions to consume. Json string
   |  {"topicA":[0,1],"topicB":[2,4]}.
   |- <subscribe> The topic list to subscribe. A comma-separated list of
   |  topics.
   |- <subscribePattern> The pattern used to subscribe to topic(s).
   |  Java regex string.
   |- Only one of "assign, "subscribe" or "subscribePattern" options can be
   |  specified for Kafka source.
   <topics> Different value format depends on the value of 'subscribe-type'.

 Run the example
    `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \
    host1:port1,host2:port2 subscribe topic1,topic2`
"""
from __future__ import print_function

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    # Create DataSet representing the stream of input lines from kafka
    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\    # HERE IT STOPS AND RETURNS ERROR
        .load()\
        .selectExpr("CAST(value AS STRING)")

    # Split the lines into words
    words = lines.select(
        # explode turns each item in an array into a separate row
        explode(
            split(lines.value, ' ')
        ).alias('word')
    )

    # Generate running word count
    wordCounts = words.groupBy('word').count()

    # Start running the query that prints the running counts to the console
    query = wordCounts\
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .start()

    query.awaitTermination()

Kafka server is runing, topic was created.

Java version 13.0.2

Scala 2.13.1

Kafka 2.12-2.4.1

Spark spark-3.0.0-preview2-bin-hadoop2.7

What is the problem?

解决方案

I was having the exact same issue too until I realized I was adding the wrong dependency!

Instead of: --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5

Use: --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-preview2

这篇关于PySpark 结构化流 + Kafka 错误(由:java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport 引起)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆