spark-structured-streaming相关内容

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

我正在阅读 Spark 结构化流媒体 - Kafka 集成指南 此处. 在这个链接中被告知 enable.auto.commit:Kafka 源不提交任何偏移量. 那么,一旦我的 Spark 应用程序成功处理了每条记录,我该如何手动提交偏移量? 解决方案 tl;dr 无法向 Kafka 提交任何消息.从 Spark 3.x 版本开始,您可以定义 Kafka 消费者组 ..

将 Spark Structured Streaming 与 Confluent Schema Registry 集成

我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录.我打算使用 Confluent Schema Registry,但是与 Spark 结构化流的集成似乎是不可能的. 我见过这个问题,但无法在 Confluent Schema Registry 中使用.使用 Spark 2.0 从 Kafka 读取 ..

结构化流自定义重复数据删除

我有一个从 kafka 传入 dataFrame 的流数据.我想根据 Id 删除重复项并根据时间戳保留最新记录. 样本数据是这样的: Id 名称计数时间戳1 维卡斯 20 2018-09-19T10:10:102 维杰 50 2018-09-19T10:10:203 维拉斯 30 2018-09-19T10:10:304 维沙尔 10 2018-09-19T10:10:401 维卡斯 50 ..
发布时间:2021-07-15 21:20:01 其他开发

Spark Structured Streaming Kafka Integration Offset 管理

文档说: enable.auto.commit:Kafka 源不提交任何偏移量. 因此我的问题是,如果工作程序或分区崩溃/重启: startingOffsets 设置为最新,我们如何不丢失消息? startingOffsets 设置为最早,我们如何不重新处理所有消息? 这似乎很重要.有关如何处理的任何指示? 解决方案 我也遇到了这个问题. 您对 2 个选项的 ..
发布时间:2021-07-15 21:02:57 其他开发

通过远程 Spark 作业出错:java.lang.IllegalAccessError:class org.apache.hadoop.hdfs.web.HftpFileSystem

问题 我正在尝试使用 Spark HDInsight 集群 (HDI 4.0) 通过 IntelliJ 运行远程 Spark 作业.在我的 Spark 应用程序中,我尝试使用内置 readStream 函数的 Spark 结构化流从 Azure blob 存储中的镶木地板文件文件夹读取输入流. 当我在连接到 HDInsight 群集的 Zeppelin 笔记本上运行代码时,该代码按预期 ..

spark结构化流式批处理数据刷新问题(partition by子句)

我在将 Spark 结构化流数据帧与批处理数据帧结合时遇到问题,我的场景我有一个 S3 流,它需要与历史数据进行左反连接,该数据返回历史中不存在的记录(找出新记录)我将这些记录作为新的追加写入历史记录(按列分区磁盘数据分区而不是内存). 当我刷新已分区的历史数据框时,我的历史数据框没有更新. 下面是两个代码片段,一个有效,另一个无效. 工作代码和非工作代码之间的唯一区别是 par ..

如何使用 PySpark 结构化流计算时间戳之间的差异

我在使用 PySpark 结构化流时遇到以下问题. 我的流数据中的每一行都有一个用户 ID 和一个时间戳.现在,对于每一行和每个用户,我想添加一个具有时间戳差异的列. 例如,假设我收到的第一行内容是:“用户 A,08:00:00".如果第二行显示“用户 A,08:00:10",那么我想在第二行中添加一个名为“间隔"的列,表示“10 秒". 有没有人知道如何实现这一目标?我尝试使用 ..
发布时间:2021-06-24 20:44:52 其他开发

如何将 kafka 上的火花流嵌套 json 转换为平面数据帧?

我第一次尝试将来自 Kafka 的 JSON 解析为 Spark 结构化流时需要一些帮助. 我正在努力转换传入的 JSON 并将其转换为平面数据帧以供进一步处理. 我的输入json是 [{ "siteId": "30:47:47:BE:16:8F", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 150271560 ..

pyspark 结构化流式写入分批写入镶木地板

我正在对 Spark 结构化流数据帧进行一些转换.我将转换后的数据帧作为镶木地板文件存储在 hdfs 中.现在我希望写入 hdfs 应该分批进行,而不是先转换整个数据帧然后存储数据帧. 解决方案 这是一个镶木地板水槽示例: # 镶木地板水槽示例targetParquetHDFS = sourceTopicKAFKA.writeStream.format("parquet") # 可以是 ..

如何有效地更新文件非常频繁修改的Impala表

我们有一个基于Hadoop的解决方案(CDH 5.15),我们在其中的某些目录中获取HDFS中的新文件.在这些目录的顶部,我们有4-5个Impala(2.1)表.在HDFS中写入这些文件的过程是Spark结构化流式传输(2.3.1) 现在,一旦将文件写入HDFS,我们就会运行一些DDL查询: ALTER TABLE table1恢复分区,以检测添加到表中的新分区(及其HDFS目录和文件 ..

如何每隔5分钟获取最近1小时的数据而不进行分组?

如何每5分钟触发一次并获取最近1个小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录.我的理由是: 读取流, 根据时间戳列过滤最近1小时的数据,并且 使用 forEachbatch 进行写入/打印.还有 为它添加水印,以免保留所有过去的数据. 火花.readStream.format("delta").table("xxx").withWatermark( ..

spark如何计算给定窗口间隔下的窗口开始时间?

考虑一下,当将窗口持续时间(无滑动间隔)设置为时,我有一个带有时间戳字段列的输入df: 10分钟 输入时间(2019-02-28 22:33:02) 形成的窗口为(2019-02-28 22:30:02)到(2019-02-28 22:40:02) 8分钟 使用相同的时间输入(2019-02-28 22:33:02) 形成的窗口为((2019-02-28 22:26:0 ..