spark-structured-streaming相关内容

有没有办法动态停止 Spark Structured Streaming?

在我的场景中,我有几个数据集不时出现,我需要在我们的平台中摄取它们.摄取过程涉及几个转换步骤.其中之一是 Spark.到目前为止,我特别使用火花结构化流媒体.基础设施还涉及 kafka,spark 结构化流从中读取数据. 我想知道是否有一种方法可以检测到某个主题在一段时间内没有其他东西可以消费时决定停止工作.那就是我想在消耗该特定数据集所需的时间内运行它,然后停止它.出于特定原因,我们决定不 ..

从多个 Kafka 主题读取 Spark 结构化流应用程序

我有一个 Spark 结构化流应用程序 (v2.3.2),它需要从许多 Kafka 主题中读取数据,进行一些相对简单的处理(主要是聚合和一些连接)并将结果发布到许多其他 Kafka 主题.因此在同一个应用程序中处理多个流. 我想知道,如果我只设置 1 个订阅多个主题的直接 readStream,然后使用选择拆分流,从资源的角度(内存、执行程序、线程、Kafka 侦听器等)来看,它是否会有所不 ..

结构化流式 Kafka 源偏移存储

我正在使用 Kafka 的结构化流源(集成指南),如前所述,它没有提交任何偏移量. 我的目标之一是监控它(检查它是否落后等).即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们.根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是: 它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程 ..

java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 触发结构化流

我在我们的项目中使用 Structured Streaming + Kafka 进行实时数据分析.我使用的是 Spark 2.2,kafka 0.10.2. 我在应用程序启动时从检查点恢复流式查询时遇到问题.由于从单个 kafka 流点派生出多个流查询,并且每个流查询都有不同的检查目录.因此,在作业失败的情况下,当我们重新启动作业时,会出现一些无法从检查点位置恢复的流查询,因此会引发异常读取 ..

Spark Structured Streaming - 处理每一行

我在 Spark 2.1.1 中使用结构化流.我需要对传入的消息应用一些业务逻辑(来自 Kafka 源). 本质上,我需要接收消息,获取一些关键值,在 HBase 中查找它们并在数据集上执行更多业务逻辑.最终结果是需要写出到另一个 Kafka 队列的字符串消息. 但是,由于传入消息的抽象是一个数据帧(无界表 - 结构化流),我必须通过 mapPartitions(由于 HBase 客户 ..

Kafka Connect cassandra 源 - 十进制数据类型的错误

我使用的是 kafka connect cassandra 源连接器 1.0 版本.我在 cassandra 表中有一个十进制数据类型列(价格),并将其作为来自源连接器的 json 写入 kafka 主题,它以某种字符串格式写入十进制值,例如 "price":"AA==".现在它在我的火花流中出错,同时将浮点数转换为“数字格式异常"....??请建议在 kafka 主题中写入值时可能出现的问题.提 ..

如何显示流式数据帧(如显示因 AnalysisException 而失败)?

所以我有一些数据在 Kafka 主题中流式传输,我正在获取这些流式数据并将其放入 DataFrame.我想在 DataFrame 中显示数据: import os从 kafka 导入 KafkaProducer从 pyspark.sql 导入 SparkSession、DataFrame导入时间从日期时间导入日期时间,时间增量os.environ['PYSPARK_SUBMIT_ARGS'] = ..

从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake

我有一个用例,其中存储在 s3 中的 json 记录的文件路径作为 kafka卡夫卡中的消息.我必须使用 Spark 结构化流处理数据. 我想到的设计如下: 在 kafka Spark 结构化流中,读取包含数据路径的消息. 在驱动程序中收集消息记录.(消息很小) 从数据位置创建数据框. kafkaDf.select($"value".cast(StringType)).wri ..

如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?

我试图从 [Databricks][1] 复制示例并将其应用到 Kafka 的新连接器并激发结构化流,但是我无法使用 Spark 中的开箱即用方法正确解析 JSON... 注意:主题以JSON格式写入Kafka. val ds1 = spark.readStream.format("卡夫卡").option("kafka.bootstrap.servers", IP + ":9092"). ..

如何在 spark 3.0 结构化流媒体中使用 kafka.group.id 和检查点以继续从 Kafka 中读取它在重启后停止的位置?

基于Spark 3.0中的介绍,https:///spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.应该可以设置“kafka.group.id"跟踪偏移量.对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失.根据我之前的问题,我觉得 Spark 3.0 中的 kafka.g ..

如何在Structured Streaming的kafka数据源中为消费者组设置group.id?

我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据.这意味着我需要强制使用特定的 group.id.但是,正如文档中所述,这是不可能的.尽管如此,在 databricks 文档 https://docs 中.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ss ..

如何为结构化查询获取 Kafka 偏移量以进行手动和可靠的偏移量管理?

Spark 2.2 引入了 Kafka 的结构化流媒体源.据我了解,它依赖于 HDFS 检查点目录来存储偏移量并保证“恰好一次"消息传递. 但是旧码头(例如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) 说 Spark Stream ..