PySpark 结构化流 + Kafka 错误(由:java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport 引起) [英] PySpark structured Streaming + Kafka Error (Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport )
问题描述
当我运行命令时,我正在尝试运行 Python Spark Structured Streaming + Kafka
Master@MacBook-Pro spark-3.0.0-preview2-bin-hadoop2.7 % bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 \示例/src/main/python/sql/streaming/structured_kafka_wordcount.py \/Users/Master/Projects/bank_kafka_spark/spark_job1.py localhost:9092 交易
接收下一个
20/04/22 13:06:04 WARN Utils:您的主机名 MacBook-Pro.local 解析为环回地址:127.0.0.1;使用 192.168.0.103 代替(在接口 en0 上)20/04/22 13:06:04 WARN Utils:如果需要绑定到另一个地址,请设置 SPARK_LOCAL_IP警告:发生了非法的反射访问操作警告:org.apache.spark.unsafe.Platform 的非法反射访问(文件:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) 到构造函数 java.nio.DirectByteBuffer(long,int)警告:请考虑将此报告给 org.apache.spark.unsafe.Platform 的维护者警告:使用 --illegal-access=warn 启用进一步非法反射访问操作的警告警告:在未来的版本中将拒绝所有非法访问操作Ivy 默认缓存设置为:/Users/Master/.ivy2/cache存储在以下位置的包的 jars:/Users/Master/.ivy2/jars:: 加载设置 :: url = jar:file:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/核心/设置/ivysettings.xmlorg.apache.spark#spark-sql-kafka-0-10_2.12 作为依赖添加:: 解决依赖项 :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0;1.0confs: [默认]在中央找到 org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5在中央找到 org.apache.kafka#kafka-clients;2.0.0在中央找到 org.lz4#lz4-java;1.4.0在中央找到 org.xerial.snappy#snappy-java;1.1.7.3在中央找到 org.slf4j#slf4j-api;1.7.16在 local-m2-cache 中找到 org.spark-project.spark#unused;1.0.0:: 解析报告 :: 解析 315ms :: artifacts dl 6ms:: 正在使用的模块:org.apache.kafka#kafka-clients;2.0.0 来自 [默认]org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5 来自 [默认]org.lz4#lz4-java;1.4.0 来自 [默认]org.slf4j#slf4j-api;1.7.16 来自 [默认]org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]org.xerial.snappy#snappy-java;1.1.7.3 来自中央 [默认]---------------------------------------------------------------------||模块 ||文物||配置 |数量|搜索|dwnlded|驱逐||数量|减少|---------------------------------------------------------------------|默认 |6 |0 |0 |0 ||6 |0 |---------------------------------------------------------------------:: 检索 :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0confs: [默认]已复制 0 个工件,已检索 6 个 (0kB/6ms)20/04/22 13:06:04 DEBUG NativeCodeLoader:尝试加载自定义构建的 native-hadoop 库...20/04/22 13:06:04 DEBUG NativeCodeLoader:无法加载本机-hadoop 错误:java.lang.UnsatisfiedLinkError:java.library.path 中没有 hadoop:[/Users/Master/Library/Java/Extensions,/库/Java/扩展、/Network/Library/Java/Extensions、/System/Library/Java/Extensions、/usr/lib/java、.]20/04/22 13:06:04 DEBUG NativeCodeLoader: java.library.path=/Users/Master/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.20/04/22 13:06:04 警告 NativeCodeLoader:无法为您的平台加载本机 Hadoop 库...在适用的情况下使用内置 Java 类回溯(最近一次调用最后一次):文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py",第68行,在<module>.option(subscribeType, 主题)\文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py",第406行,加载中文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",第1286行,__call__文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",第 98 行,deco文件/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",第328行,get_return_valuepy4j.protocol.Py4JJavaError:调用 o31.load 时发生错误.: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport在 java.base/java.lang.ClassLoader.defineClass1(Native Method)在 java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)在 java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:151)在 java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:821)在 java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:719)在 java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:642)在 java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:600)在 java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:575)在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)在 java.base/java.lang.Class.forName0(Native Method)在 java.base/java.lang.Class.forName(Class.java:416)在 java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1210)在 java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)在 java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)在 java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)在 java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)在 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)在 scala.collection.Iterator.foreach(Iterator.scala:941)在 scala.collection.Iterator.foreach$(Iterator.scala:941)在 scala.collection.AbstractIterator.foreach(Iterator.scala:1429)在 scala.collection.IterableLike.foreach(IterableLike.scala:74)在 scala.collection.IterableLike.foreach$(IterableLike.scala:73)在 scala.collection.AbstractIterable.foreach(Iterable.scala:56)在 scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)在 scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)在 scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)在 scala.collection.TraversableLike.filter(TraversableLike.scala:347)在 scala.collection.TraversableLike.filter$(TraversableLike.scala:347)在 scala.collection.AbstractTraversable.filter(Traversable.scala:108)在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.base/java.lang.reflect.Method.invoke(Method.java:567)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在 py4j.Gateway.invoke(Gateway.java:282)在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79)在 py4j.GatewayConnection.run(GatewayConnection.java:238)在 java.base/java.lang.Thread.run(Thread.java:830)引起:java.lang.ClassNotFoundException:org.apache.spark.sql.sources.v2.StreamWriteSupport在 java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)在 java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)... 43 更多
我使用来自 PySpark examples/src/main/python/sql/streaming/structured_kafka_wordcount.py 的示例.
structured_kafka_wordcount.py.
<预><代码>## 根据一项或多项授权给 Apache 软件基金会 (ASF)# 贡献者许可协议.请参阅随附的 NOTICE 文件# 这项工作是为了获取有关版权所有权的其他信息.# ASF 根据 Apache 许可,版本 2.0 向您授予此文件的许可#(许可证");除非符合以下规定,否则您不得使用此文件# 许可证.您可以在以下网址获取许可证副本## http://www.apache.org/licenses/LICENSE-2.0## 除非适用法律要求或书面同意,否则软件# 在许可证下分发是按原样"分发的,# 没有任何形式的明示或暗示的保证或条件.# 请参阅许可证以了解管理权限的特定语言和# 许可证下的限制.#"""使用来自 Kafka 中一个或多个主题的消息并进行字数统计.用法:structured_kafka_wordcount.pyKafka 服务器正在运行,主题已创建.
Java 版本 13.0.2
Scala 2.13.1
卡夫卡 2.12-2.4.1
Spark spark-3.0.0-preview2-bin-hadoop2.7
有什么问题?
我也遇到了完全相同的问题,直到我意识到我添加了错误的依赖项!
代替:--packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5
使用:--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-preview2
I am trying to run Python Spark Structured Streaming + Kafka, when I run the command
Master@MacBook-Pro spark-3.0.0-preview2-bin-hadoop2.7 % bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 \
examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \
/Users/Master/Projects/bank_kafka_spark/spark_job1.py localhost:9092 transaction
receiving next
20/04/22 13:06:04 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.103 instead (on interface en0)
20/04/22 13:06:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Ivy Default Cache set to: /Users/Master/.ivy2/cache
The jars for the packages stored in: /Users/Master/.ivy2/jars
:: loading settings :: url = jar:file:/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5 in central
found org.apache.kafka#kafka-clients;2.0.0 in central
found org.lz4#lz4-java;1.4.0 in central
found org.xerial.snappy#snappy-java;1.1.7.3 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.spark-project.spark#unused;1.0.0 in local-m2-cache
:: resolution report :: resolve 315ms :: artifacts dl 6ms
:: modules in use:
org.apache.kafka#kafka-clients;2.0.0 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.5 from central in [default]
org.lz4#lz4-java;1.4.0 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]
org.xerial.snappy#snappy-java;1.1.7.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 6 | 0 | 0 | 0 || 6 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-cd5905ea-5f80-4b14-995d-6ba03a353bb0
confs: [default]
0 artifacts copied, 6 already retrieved (0kB/6ms)
20/04/22 13:06:04 DEBUG NativeCodeLoader: Trying to load the custom-built native-hadoop library...
20/04/22 13:06:04 DEBUG NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path: [/Users/Master/Library/Java/Extensions, /Library/Java/Extensions, /Network/Library/Java/Extensions, /System/Library/Java/Extensions, /usr/lib/java, .]
20/04/22 13:06:04 DEBUG NativeCodeLoader: java.library.path=/Users/Master/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
20/04/22 13:06:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py", line 68, in <module>
.option(subscribeType, topics)\
File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 406, in load
File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 98, in deco
File "/Users/Master/Projects/spark-3.0.0-preview2-bin-hadoop2.7/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:151)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:821)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:719)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:642)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:600)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:575)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:416)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1210)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 43 more
I use example from PySpark examples/src/main/python/sql/streaming/structured_kafka_wordcount.py.
structured_kafka_wordcount.py.
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Consumes messages from one or more topics in Kafka and does wordcount.
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
<bootstrap-servers> The Kafka "bootstrap.servers" configuration. A
comma-separated list of host:port.
<subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe',
'subscribePattern'.
|- <assign> Specific TopicPartitions to consume. Json string
| {"topicA":[0,1],"topicB":[2,4]}.
|- <subscribe> The topic list to subscribe. A comma-separated list of
| topics.
|- <subscribePattern> The pattern used to subscribe to topic(s).
| Java regex string.
|- Only one of "assign, "subscribe" or "subscribePattern" options can be
| specified for Kafka source.
<topics> Different value format depends on the value of 'subscribe-type'.
Run the example
`$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \
host1:port1,host2:port2 subscribe topic1,topic2`
"""
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\ # HERE IT STOPS AND RETURNS ERROR
.load()\
.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
# explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
query.awaitTermination()
Kafka server is runing, topic was created.
Java version 13.0.2
Scala 2.13.1
Kafka 2.12-2.4.1
Spark spark-3.0.0-preview2-bin-hadoop2.7
What is the problem?
I was having the exact same issue too until I realized I was adding the wrong dependency!
Instead of: --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5
Use: --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-preview2
这篇关于PySpark 结构化流 + Kafka 错误(由:java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport 引起)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!