pyspark.sql.utils.AnalysisException:无法找到数据源:kafka [英] pyspark.sql.utils.AnalysisException: Failed to find data source: kafka

查看:26
本文介绍了pyspark.sql.utils.AnalysisException:无法找到数据源:kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 pyspark 从 kafka 读取流.我正在使用 spark 版本 3.0.0-preview2spark-streaming-kafka-0-10_2.12在此之前,我只是统计 zookeeper,kafka 并创建一个新主题:

I am trying to read a stream from kafka using pyspark. I am using spark version 3.0.0-preview2 and spark-streaming-kafka-0-10_2.12 Before this I just stat zookeeper, kafka and create a new topic:

/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties 
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic data_wm

这是我的代码:

import pandas as pd
import os
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TestApp").getOrCreate()
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "data_wm") \
  .load() 
value = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 

这是我运行脚本的方式:

This how I run my script:

sudo --preserve-env=pyspark/usr/local/spark/bin/pyspark --packagesorg.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0-preview

sudo --preserve-env=pyspark /usr/local/spark/bin/pyspark --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0-preview

作为这个命令的结果,我有这个:

As result for this command I have this :

: resolving dependencies :: org.apache.spark#spark-submit-parent-0d7b2a8d-a860-4766-a4c7-141a902d8365;1.0
        confs: [default]
        found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview in central
        found org.apache.kafka#kafka-clients;2.3.1 in central
        found com.github.luben#zstd-jni;1.4.3-1 in central
        found org.lz4#lz4-java;1.6.0 in central
        found org.xerial.snappy#snappy-java;1.1.7.3 in central
        found org.slf4j#slf4j-api;1.7.16 in central
        found org.spark-project.spark#unused;1.0.0 in central :: resolution report :: resolve 380ms :: artifacts dl 7ms
        :: modules in use:
        com.github.luben#zstd-jni;1.4.3-1 from central in [default]
        org.apache.kafka#kafka-clients;2.3.1 from central in [default]
        org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview from central in [default]
        org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview from central in [default]
        org.lz4#lz4-java;1.6.0 from central in [default]
        org.slf4j#slf4j-api;1.7.16 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.7.3 from central in [default]

但我总是有这个错误:

d> f = spark \ ... .readStream \ ... .format("kafka") \ ...

d> f = spark \ ... .readStream \ ... .format("kafka") \ ...

.option("kafka.bootstrap.servers", "localhost:9092") \ ...
.option("subscribe", "data_wm") \ ... .load() 回溯(大多数最近通话最后一次):文件",第 5 行,在文件中/usr/local/spark/python/pyspark/sql/streaming.py",第 406 行,加载中返回 self._df(self._jreader.load()) 文件/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",第 1286 行,在 call 文件中/usr/local/spark/python/pyspark/sql/utils.py",第 102 行,在装饰中raise 转换后的 pyspark.sql.utils.AnalysisException:无法找到数据源:kafka.请按照以下步骤部署应用程序Structured Streaming + Kafka Integration的部署部分指南".;

.option("kafka.bootstrap.servers", "localhost:9092") \ ...
.option("subscribe", "data_wm") \ ... .load() Traceback (most recent call last): File "", line 5, in File "/usr/local/spark/python/pyspark/sql/streaming.py", line 406, in load return self._df(self._jreader.load()) File "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call File "/usr/local/spark/python/pyspark/sql/utils.py", line 102, in deco raise converted pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

我不知道这个错误的原因,请帮忙

I don't know the cause of this error, please help

推荐答案

我已经在 Spark 3.0.1(使用 PySpark)上成功解决了这个错误.

I have successfully resolved this error on Spark 3.0.1 (using PySpark).

我会保持简单并通过 --packages 参数提供所需的包:

I would keep things simple and provide the desired packages through the --packages argument:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 MyPythonScript.py

注意参数的顺序,否则会抛出错误.

Mind the order of arguments otherwise it will throw an error.

其中 MyPythonScript.py 有:

KAFKA_TOPIC = "data_wm"
KAFKA_SERVER = "localhost:9092"

# creating an instance of SparkSession
spark_session = SparkSession \
    .builder \
    .appName("Python Spark create RDD") \
    .getOrCreate()

# Subscribe to 1 topic
df = spark_session \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_SERVER) \
    .option("subscribe", KAFKA_TOPIC) \
    .load()
print(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))

这篇关于pyspark.sql.utils.AnalysisException:无法找到数据源:kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆