PYSpark没有打印Kafka流中的任何数据,也没有失败 [英] Pyspark not printing any data from kafka stream, not failing either

查看:0
本文介绍了PYSpark没有打印Kafka流中的任何数据,也没有失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。

代码:

from pyspark.sql.functions import col

kafkaServer="<server>"

editsDF=(spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers",kafkaServer)
        .option("sasl.username","<username>")
        .option("sasl.password","<password>")
        .option("group.id", "%s-consumer" % "<username>")
        .option("session.timeout.ms", 6000)
        .option("default.topic.config", {"auto.offset.reset": "smallest"})
        .option('security.protocol', 'SASL_SSL')
        .option('sasl.mechanisms', 'SCRAM-SHA-256')
        .option("subscribe","<topic>")
        .option("startingOffsets","latest")
        .option("maxOffsetsPerTrigger",1000)
        .load()
        .select(col("value").cast("STRING"))
        )


query = editsDF 
    .writeStream 
    .outputMode("append") 
    .format("console") 
    .start()

The status in databricks while running the code:

如果我遗漏了什么,请告诉我。提前谢谢。

注意:我已经确保了Kafka服务器能够生成消息,并且能够在一个python程序中使用它。但不是在火花源里工作。此外,数据大小非常小,因此不会出现性能问题。

编辑:这个建议的函数Display()仍然不会为这个有问题的Kafka服务器打印任何数据,但是当我尝试完全使用另一个Kafka服务器时,它工作得很好。我认为这是因为这台Kafka服务器(有问题)使用的是SASL-SCRAM身份验证,所以可能需要进行一些不同的配置。请提供任何详细信息/链接/样本,如果您有从派斯帕克连接SASL Kafka。谢谢!

推荐答案

当您使用console接收器时,它会将数据打印到标准输出(请参阅Spark docs),因此您需要检查群集用户界面中的驱动程序日志以获取生成的数据。

要查看Databricks笔记本本身中的数据,您需要使用display函数,该函数支持显示结构化流中的数据(请参阅Databricks docs)。因此,不是

query = editsDF 
    .writeStream 
    .outputMode("append") 
    .format("console") 
    .start()

您只需写:

display(editsDF)

您还可以将其他选项传递给此函数,如checkpointLocationtrigger等。-检查我上面链接的文档。

这篇关于PYSpark没有打印Kafka流中的任何数据,也没有失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆