spark-structured-streaming相关内容

PYSpark没有打印Kafka流中的任何数据,也没有失败

我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。 代码: from pyspark.sql.functions import col kaf ..

电光结构化流媒体:处理负载是否影响输入率/数字输入记录?

我当前的结构化流应用程序写入一个巨大的Delta表。当我(停止流)并将其指向写入全新的增量表时: 它变得更快-批量持续时间减少了近四分之一 投入率几乎提高了3倍 我知道它可能会变得更快,因为它在旧的/更大的表上执行的任何聚合/写入在新表上都不需要。但投入率的变化我希望有人能解释一下? 源为Azure EventHubs。 谢谢! 推荐答案 回答我自己的问题: ..

为什么 Spark 应用程序失败并显示“ClassNotFoundException:无法找到数据源:kafka"作为带有 sbt 程序集的 uber-jar?

我正在尝试运行像 StructuredKafkaWordCount.我从 Spark 结构化流式编程指南开始. 我的代码是 包 io.boontadata.spark.job1导入 org.apache.spark.sql.SparkSession对象 DirectKafkaAggregateEvents {val FIELD_MESSAGE_ID = 0val FIELD_DEVICE_ ..

使用 RabbitMQ 源的 Spark 结构化流

我正在尝试为 Structured Streaming 编写一个自定义接收器,它将使用来自 RabbitMQ 的消息.Spark 最近发布 DataSource V2 API,看起来很有前途.由于它抽象了许多细节,我想使用这个 API 以既简单又性能好.但是,由于它很新,因此可用的资源并不多.我需要经验丰富的 Spark 人员的说明,因为他们会更容易掌握关键点.我们开始: 我的起点是博客文章 ..
发布时间:2022-01-11 17:27:46 Java开发

如何从 Kafka 读取 XML 格式的流数据?

我正在尝试使用 Spark 结构化流从 Kafka 主题中读取 XML 数据. 我尝试使用 Databricks spark-xml 包,但我收到一条错误消息,指出此包不支持流式读取.有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据? 我当前的代码: df = spark \.readStream \.format(“卡夫卡")\.format('com.datab ..

如何在 spark 3.0 结构化流媒体中使用 kafka.group.id 和检查点以继续从 Kafka 读取它在重启后停止的位置?

基于Spark 3.0中的介绍,https:///spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.应该可以设置“kafka.group.id"跟踪偏移量.对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失.根据我之前的问题,我觉得 Spark 3.0 中的 kafka.g ..

如何从 Zeppelin 中的控制台流接收器获取输出?

我正在努力让 console 接收器与 从 Zeppelin 运行时的 PySpark 结构化流.基本上,我没有看到任何结果打印到屏幕上,也没有看到我找到的任何日志文件. 我的问题: 有没有人有将 PySpark 结构化流与接收器一起使用的工作示例,该接收器产生在 Apache Zeppelin 中可见的输出?理想情况下,它还可以使用套接字源,因为这很容易测试. 我正在使用: U ..

spark结构化流式批处理数据刷新问题(partition by子句)

我在将 Spark 结构化流数据帧与批处理数据帧结合时遇到问题,我的场景我有一个 S3 流,它需要与历史数据进行左反连接,该数据返回历史中不存在的记录(找出新记录)并且我将这些记录作为新的追加写入历史记录(按列分区磁盘数据分区而不是内存). 当我刷新已分区的历史数据框时,我的历史数据框没有更新. 下面是两个代码片段,一个有效,另一个无效. 工作代码和非工作代码之间的唯一区别是 p ..

在 Spark 结构化流中反序列化 kafka avro 主题的 int 编码无效

我正在尝试使用 spark 结构化流(版本 2.3.1)处理来自 kafka 的流 avro 数据,所以我尝试使用 this 反序列化示例.仅当主题 value 部分包含 StringType 时才有效,但在我的情况下,架构包含 long 和 integers,如下所示: public static final String USER_SCHEMA = “{";+ "\"类型\":\"记录\", ..