PySpark Kafka py4j.protocol.Py4JJavaError:调用o28.load时发生错误 [英] PySpark Kafka py4j.protocol.Py4JJavaError: An error occurred while calling o28.load
问题描述
将Kafka消息转换为数据帧时,将软件包作为参数传递时出错.
While converting Kafka messages to dataframe am getting error while passing the packages as an argument.
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-sql-kafka-0-10_2.11-2.0.2.jar,spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar pyspark-shell'
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Jim_Topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
py4j.protocol.Py4JJavaError:调用o28.load时发生错误.:java.util.ServiceConfigurationError:org.apache.spark.sql.sources.DataSourceRegister:提供者org.apache.spark.sql.kafka010.KafkaSourceProvider无法实例化
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. : java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated
推荐答案
之所以发生这种情况,是因为 spark-sql-kafka
的版本与您当前正在运行的spark版本不匹配.
This is happening because the version of spark-sql-kafka
does not match the spark version you are currently running.
例如,您当前使用的依赖项将适用于Spark 2.4.1:
For example, the dependency you are currently using would work for Spark 2.4.1:
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1
要解决此问题,只需在依赖项字符串的末尾使用您的Spark版本(替换 x.y.z
):
To fix the issue, simply use the version of your Spark at the end of the dependency string (replace x.y.z
):
org.apache.spark:spark-sql-kafka-0-10_2.11:x.y.z
这篇关于PySpark Kafka py4j.protocol.Py4JJavaError:调用o28.load时发生错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!