spark-structured-streaming相关内容

在Spark 2.x中将两个流合并到结构化流中的解决方法

我有一系列配置(不经常更改,但是如果有更新,它将是一条消息),以及另一组原始数据点. 据我了解,spark目前不支持加入流数据集或数据帧.有解决此问题的好方法吗? 是否可以将其中一个流数据集“快照"到静态数据集(可能是配置,因为它的更新较少),然后与另一个流数据集联接? 欢迎提出建议! 解决方案 所以这是我最后要做的事情. 将更新较少的流放入内存接收器.然后从该表中 ..
发布时间:2021-04-08 20:04:06 其他开发

为什么在流式数据帧/数据集上有流式聚合时,流式数据集会失败并显示“不支持完整输出模式"?

我使用Spark 2.2.0,并且在Windows上的Spark结构化流媒体出现以下错误: 当 streaming数据帧/数据集上有 streaming聚合而没有 watermark 时,不支持 完全输出模式. 解决方案 当不带水印的流数据帧/数据集上存在流聚合时,不支持完全输出模式 流聚合要求您告诉Spark结构化流引擎何时输出聚合(根据所谓的输出模式),因为可能属于聚 ..
发布时间:2021-04-08 20:02:47 其他开发

如何为Spark结构化流编写ElasticsearchSink

我正在使用Spark结构化流传输来处理来自Kafka队列的大量数据并进行大量的ML计算,但是我需要将结果写入Elasticsearch. 我尝试使用 ForeachWriter ,但无法在其中获取 SparkContext ,另一个选择可能是在 HTTP Post ForeachWriter . 现在,我正在考虑编写自己的ElasticsearchSink. 是否有任何文档可以为 ..

Spark结构化流中ForeachWriter的目的是什么?

有人可以解释Spark结构化流上foreach writer的需求吗? 当我们以dataFrame的形式获取所有源数据时,我没有使用foreachwriter. 解决方案 DataFrame是抽象的Spark概念,不会直接映射为可以执行的格式,例如写入控制台或数据库. 通过创建 ForeachWriter ,您将获取DataFrame的行(或批次),并定义如何 open()您要 ..
发布时间:2021-04-08 19:48:16 其他开发

在Spark结构流2.3.0中连接两个流时,左外部连接不发出空值

两个流上的左外部联接不发出空输出.它只是在等待将记录添加到另一个流中.使用套接字流对此进行测试.在我们的例子中,我们要发出的空值与id或/不匹配且不属于时间范围条件的记录 水印和间隔的详细信息是: val ds1Map = ds1.selectExpr("Id AS ds1_Id","ds1_timestamp").withWatermark("ds1_timestamp","10秒") ..
发布时间:2021-04-08 19:47:01 其他开发

如何在Spark结构化流中基于时间戳字段进行重复数据删除并保持最新状态?

Spark dropDuplicates 保留第一个实例,并忽略该键的所有后续出现.是否可以在保持最新发生的同时删除重复项? 例如,如果下面是我得到的微型批次,那么我想保留每个国家的最新记录(按时间戳记字段排序). 批处理ID:0 澳大利亚,10,2020-05-05 00:00:06白俄罗斯,10,2020-05-05 00:00:06 批处理ID:1 澳大利亚,10,202 ..

如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?

当前,我正在使用Spark结构化流式传输来创建(id,timestamp_value,device_id,temperature_value,comment)形式的随机数据的数据帧. 每批 Spark数据帧: 我想要的是输出"Summarizer"的结果.功能到我的控制台.这就是为什么我使用"append"(添加)用于outputMode和格式“控制台".但是我遇到了这个错误: pyspar ..
发布时间:2021-04-08 19:45:00 其他开发

如何使用Trigger.Once选项在Spark 3 Structure Stream Kafka/Files源中配置反向压力

在Spark 3中,更改了Kafka上的反压选项,并且Trigger.once场景的文件源已更改. 但是我有一个问题.当我想使用TriggerOnce时如何为我的工作配置背压? 在spark 2.4中,我有一个用例,以回填一些数据,然后启动流.所以我只使用一次触发器,但是我的回填场景可能非常大,有时由于混洗和驱动程序内存而对磁盘造成过大的负载,因为FileIndex缓存在此.因此,我使 ..

如何在Spark结构化流中将两个流df写入MySQL的两个不同表中?

我正在使用spark 2.3.2版本. 我已经在Spark结构化流中编写了代码,以将流数据帧数据插入到两个不同的MySQL表中. 假设有两个流式df:DF1,DF2. 我已经使用foreachWriter API编写了两个查询(query1,query2),分别从不同的流中写入MySQL表.IE.DF1进入MYSQLtable A,DF2进入MYSQL表B. 当我运行spa ..
发布时间:2021-04-08 19:38:37 其他开发

awaitTermination后如何获取流式查询的进度?

我是Spark的新手,正在阅读一些有关监视Spark应用程序的内容.基本上,我想知道在给定的触发时间和查询进度下,spark应用程序处理了多少条记录.我知道'lastProgress'会提供所有这些指标,但是当我将awaitTermination与'lastProgress'一起使用时,它总是返回null. val q4s = spark.readStream.format("kafka"). ..
发布时间:2021-04-08 19:36:26 其他开发

Spark结构化流:多个接收器

我们从Kafka那里使用结构化流,并将处理后的数据集写入s3. 我们还希望将处理后的数据向前写入Kafka,是否可以从同一流查询中进行处理?(火花版本2.1.1) 在日志中,我看到了流式查询进度输出,并且从日志中获得了一个示例持续时间JSON,能否有人请更清楚地说明 addBatch 和 getBatch ? TriggerExecution-处理获取的数据和写入接收器都需要时间 ..
发布时间:2021-04-08 19:35:09 其他开发

如何从Kafka读取XML格式的流数据?

我正在尝试使用Spark结构化流技术从Kafka主题中读取XML数据. 我尝试使用Databricks的 spark-xml 程序包,但是出现错误消息,说该程序包不支持流式读取.有什么方法可以使用结构化流从Kafka主题提取XML数据? 我当前的代码: df = spark \.readStream \.format("kafka")\.format('com.databricks ..

从HDFS读取时,Spark结构化流不写入数据

我正在研究一种流脚本,该脚本应在文件落入HDFS时立即将其拾取,聚合并将它们写入其他位置. 在这里,我无法进行写操作-它创建了元数据文件夹,但没有实际写操作发生.在10多个文件(结构相同)中,只有一个被写入,我不确定为什么 有人可以帮我吗? 从pyspark.sql中的 导入SparkSession导入pyspark.sql.functions作为sqlfunc导入argparse ..
发布时间:2021-04-08 19:29:45 Python