PYSpark没有打印Kafka流中的任何数据,也没有失败 [英] Pyspark not printing any data from kafka stream, not failing either
本文介绍了PYSpark没有打印Kafka流中的任何数据,也没有失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。
代码:
from pyspark.sql.functions import col
kafkaServer="<server>"
editsDF=(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaServer)
.option("sasl.username","<username>")
.option("sasl.password","<password>")
.option("group.id", "%s-consumer" % "<username>")
.option("session.timeout.ms", 6000)
.option("default.topic.config", {"auto.offset.reset": "smallest"})
.option('security.protocol', 'SASL_SSL')
.option('sasl.mechanisms', 'SCRAM-SHA-256')
.option("subscribe","<topic>")
.option("startingOffsets","latest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(col("value").cast("STRING"))
)
query = editsDF
.writeStream
.outputMode("append")
.format("console")
.start()
The status in databricks while running the code:
如果我遗漏了什么,请告诉我。提前谢谢。
注意:我已经确保了Kafka服务器能够生成消息,并且能够在一个python程序中使用它。但不是在火花源里工作。此外,数据大小非常小,因此不会出现性能问题。
编辑:这个建议的函数Display()仍然不会为这个有问题的Kafka服务器打印任何数据,但是当我尝试完全使用另一个Kafka服务器时,它工作得很好。我认为这是因为这台Kafka服务器(有问题)使用的是SASL-SCRAM身份验证,所以可能需要进行一些不同的配置。请提供任何详细信息/链接/样本,如果您有从派斯帕克连接SASL Kafka。谢谢!
推荐答案
当您使用console
接收器时,它会将数据打印到标准输出(请参阅Spark docs),因此您需要检查群集用户界面中的驱动程序日志以获取生成的数据。
display
函数,该函数支持显示结构化流中的数据(请参阅Databricks docs)。因此,不是
query = editsDF
.writeStream
.outputMode("append")
.format("console")
.start()
您只需写:
display(editsDF)
您还可以将其他选项传递给此函数,如checkpointLocation
、trigger
等。-检查我上面链接的文档。
这篇关于PYSpark没有打印Kafka流中的任何数据,也没有失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文