spark-structured-streaming相关内容
scala>val map1 = spark.sql("select map('p1','s1','p2','s2')") map1:org.apache.spark.sql.DataFrame = [map(p1,s1,p2,s2):map] scala>map1.show()+ -------------------- +|map(p1,s1,p2,s
..
我有一系列配置(不经常更改,但是如果有更新,它将是一条消息),以及另一组原始数据点. 据我了解,spark目前不支持加入流数据集或数据帧.有解决此问题的好方法吗? 是否可以将其中一个流数据集“快照"到静态数据集(可能是配置,因为它的更新较少),然后与另一个流数据集联接? 欢迎提出建议! 解决方案 所以这是我最后要做的事情. 将更新较少的流放入内存接收器.然后从该表中
..
我使用Spark 2.2.0,并且在Windows上的Spark结构化流媒体出现以下错误: 当 streaming数据帧/数据集上有 streaming聚合而没有 watermark 时,不支持 完全输出模式. 解决方案 当不带水印的流数据帧/数据集上存在流聚合时,不支持完全输出模式 流聚合要求您告诉Spark结构化流引擎何时输出聚合(根据所谓的输出模式),因为可能属于聚
..
我正在使用Spark结构化流传输来处理来自Kafka队列的大量数据并进行大量的ML计算,但是我需要将结果写入Elasticsearch. 我尝试使用 ForeachWriter ,但无法在其中获取 SparkContext ,另一个选择可能是在 HTTP Post ForeachWriter . 现在,我正在考虑编写自己的ElasticsearchSink. 是否有任何文档可以为
..
运行火花作业时出现以下错误: org.apache.spark.sql.AnalysisException:当流数据帧/数据集上存在流聚合时,不支持追加输出模式; 我不确定是否由于缺少 用法: val theGroupedDF = theDF.multiplyYieldByHundred.explodeDates.aggregateByValueval query = the
..
我正在关注 它失败并显示: 在处理上述异常期间,发生了另一个异常:AnalysisExceptionTraceback(最近一次通话结束)在()中---->1 spark.sql(“从id_counts中选择*").show()sql(/sqlQuery)中的/usr/spark-2.0.2/python/py
..
我正试图将Spark结构化流式(2.3)数据集写入ScyllaDB(Cassandra). 我写数据集的代码: def saveStreamSinkProvider(ds:Dataset [InvoiceItemKafka])= {ds.writeStream.format("cassandra.ScyllaSinkProvider").outputMode(OutputMode.App
..
在spark结构流之上执行spark SQL时遇到一些问题.PFA错误. 这是我的代码 对象sparkSqlIntegration {def main(args:Array [String]){val spark = SparkSession.builder.appName("StructuredStreaming").master("local [*]").config("spark.s
..
有人可以解释Spark结构化流上foreach writer的需求吗? 当我们以dataFrame的形式获取所有源数据时,我没有使用foreachwriter. 解决方案 DataFrame是抽象的Spark概念,不会直接映射为可以执行的格式,例如写入控制台或数据库. 通过创建 ForeachWriter ,您将获取DataFrame的行(或批次),并定义如何 open()您要
..
两个流上的左外部联接不发出空输出.它只是在等待将记录添加到另一个流中.使用套接字流对此进行测试.在我们的例子中,我们要发出的空值与id或/不匹配且不属于时间范围条件的记录 水印和间隔的详细信息是: val ds1Map = ds1.selectExpr("Id AS ds1_Id","ds1_timestamp").withWatermark("ds1_timestamp","10秒")
..
Spark dropDuplicates 保留第一个实例,并忽略该键的所有后续出现.是否可以在保持最新发生的同时删除重复项? 例如,如果下面是我得到的微型批次,那么我想保留每个国家的最新记录(按时间戳记字段排序). 批处理ID:0 澳大利亚,10,2020-05-05 00:00:06白俄罗斯,10,2020-05-05 00:00:06 批处理ID:1 澳大利亚,10,202
..
当前,我正在使用Spark结构化流式传输来创建(id,timestamp_value,device_id,temperature_value,comment)形式的随机数据的数据帧. 每批 Spark数据帧: 我想要的是输出"Summarizer"的结果.功能到我的控制台.这就是为什么我使用"append"(添加)用于outputMode和格式“控制台".但是我遇到了这个错误: pyspar
..
在Spark 3中,更改了Kafka上的反压选项,并且Trigger.once场景的文件源已更改. 但是我有一个问题.当我想使用TriggerOnce时如何为我的工作配置背压? 在spark 2.4中,我有一个用例,以回填一些数据,然后启动流.所以我只使用一次触发器,但是我的回填场景可能非常大,有时由于混洗和驱动程序内存而对磁盘造成过大的负载,因为FileIndex缓存在此.因此,我使
..
我正在尝试使用Spark结构化流在本地计算机上运行一些测试. 在批处理模式下,这是我要处理的行: val recordSchema = StructType(List(StructField("Record",MapType(StringType,StringType),false)))val行=列表(排(Map("ID"->"1","STRUCTUREID"->"MFCD0086985
..
我正在使用spark 2.3.2版本. 我已经在Spark结构化流中编写了代码,以将流数据帧数据插入到两个不同的MySQL表中. 假设有两个流式df:DF1,DF2. 我已经使用foreachWriter API编写了两个查询(query1,query2),分别从不同的流中写入MySQL表.IE.DF1进入MYSQLtable A,DF2进入MYSQL表B. 当我运行spa
..
我是Spark的新手,正在阅读一些有关监视Spark应用程序的内容.基本上,我想知道在给定的触发时间和查询进度下,spark应用程序处理了多少条记录.我知道'lastProgress'会提供所有这些指标,但是当我将awaitTermination与'lastProgress'一起使用时,它总是返回null. val q4s = spark.readStream.format("kafka").
..
在Spark结构化流2.2.1中,到同一数据库接收器的 两个 Writestream 没有按顺序发生.请提出如何使它们依次执行的建议. val deleteSink = ds1.writestream.outputMode(“更新").foreach(mydbsink).开始()val UpsertSink = ds2.writestream.outputMode(“更新").foreach
..
我们从Kafka那里使用结构化流,并将处理后的数据集写入s3. 我们还希望将处理后的数据向前写入Kafka,是否可以从同一流查询中进行处理?(火花版本2.1.1) 在日志中,我看到了流式查询进度输出,并且从日志中获得了一个示例持续时间JSON,能否有人请更清楚地说明 addBatch 和 getBatch ? TriggerExecution-处理获取的数据和写入接收器都需要时间
..
我正在尝试使用Spark结构化流技术从Kafka主题中读取XML数据. 我尝试使用Databricks的 spark-xml 程序包,但是出现错误消息,说该程序包不支持流式读取.有什么方法可以使用结构化流从Kafka主题提取XML数据? 我当前的代码: df = spark \.readStream \.format("kafka")\.format('com.databricks
..
我正在研究一种流脚本,该脚本应在文件落入HDFS时立即将其拾取,聚合并将它们写入其他位置. 在这里,我无法进行写操作-它创建了元数据文件夹,但没有实际写操作发生.在10多个文件(结构相同)中,只有一个被写入,我不确定为什么 有人可以帮我吗? 从pyspark.sql中的 导入SparkSession导入pyspark.sql.functions作为sqlfunc导入argparse
..