任何线索如何加入这个火花结构的流连接? [英] Any clue how to join this spark-structured stream joins?

查看:23
本文介绍了任何线索如何加入这个火花结构的流连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 spark-sql-2.4.1 和 spark-cassandra-connector-2_11.jar

我正在尝试加入流数据集,如下所示:

 数据集<行>companyInfo_df = company_info_df.select("companyInfo.*" ).withColumn("companyInfoEventTs", ( col("eventTs").divide(1000) ).cast(DataTypes.TimestampType)).withWatermark("companyInfoEventTs", "60 秒");数据集<行>companyFin_df = comapany_fin_df.select("companyFin.*" ).withColumn("eventTimeStamp", ( col("eventTs").divide(1000) ).cast(DataTypes.TimestampType)).withWatermark("eventTimeStamp", "60 秒").通过...分组(window(col("eventTimeStamp").cast(DataTypes.TimestampType), "30 seconds", "20 seconds", "10 seconds"),col("company_id"),col("年"),col("季度")).agg(min("revenue").alias("min_revenue"),max("revenue").alias("max_revenue") ,avg("revenue").alias("mean_revenue"),first("eventTimeStamp").alias("companyFinEventTs")).select("company_id","year", "quarter", "companyFinEventTs", "window.start","window.end","min_revenue", "max_revenue","mean_revenue");数据集<行>companyFinWithWatermark = companyFin_df.withWatermark("companyFinEventTs", "2 分钟");数据集<行>companyInfoWithWatermark = companyInfo_df.withWatermark("companyInfoEventTs", "3 分钟");Column joinExpr = expr(" company_id = companyid AND companyFinEventTs >= companyInfoEventTs AND companyFinEventTs <= companyInfoEventTs + 间隔1分钟");数据集<行>companyDfAfterJoin2 = companyFinWithWatermark.join(companyInfoWithWatermark,加入表达式//,"leftOuter").withColumn("last_update_timestamp", current_timestamp()).withColumn("avg_revenue", col("mean_revenue"))数据集<行>companyDfAfterJoin = companyDfAfterJoin2//.withWatermark("companyFinEventTs", "60 秒").select("company_id","company_name","year","quarter", "avg_revenue","last_update_timestamp", "companyFinEventTs");System.out.println(" companyDfAfterJoin ***********************************");companyDfAfterJoin.writeStream().format("控制台").outputMode("追加").option("truncate", false).trigger(Trigger.ProcessingTime("10 秒")).开始();

知道如何修复它,这里有什么问题吗?

出现以下错误:

companyDfAfterJoin *******************************线程main" org.apache.spark.sql.AnalysisException 中的异常:当在没有水印的流数据帧/数据集上存在流聚合时,不支持追加输出模式;;项目 [company_id#102、company_name#64、year#103、季度#104、avg_revenue#216、last_update_timestamp#200、companyFinEventTs#137-T120000ms]+- 项目 [company_id#102、company_name#64、year#103、季度#104、avg_revenue#216、last_update_timestamp#200、companyFinEventTs#137-T120000ms]+- 项目 [company_id#102、year#103、quarter#104、companyFinEventTs#137-T120000ms、start#147、end#148、min_revenue#131、max_revenue#133、mean_revenue#135、company_name#64、registeredYear#66headQuarteredCity#67、companyid#74、companyInfoEventTs#81-T180000ms、last_update_timestamp#200、mean_revenue#135 AS avg_revenue#216]+- 项目 [company_id#102、year#103、quarter#104、companyFinEventTs#137-T120000ms、start#147、end#148、min_revenue#131、max_revenue#133、mean_revenue#135、company_name#64、registeredYear#66headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T180000ms, current_timestamp() AS last_update_timestamp#200]+- 加入内部,(((company_id#102 = companyid#74) && (companyFinEventTs#137-T120000ms >= companyInfoEventTs#81-T180000ms)) && (companyFinEventTs#1300cast <ms <(companyInfoEventTs#81-T180000ms + 间隔 1 分钟作为时间戳))):- EventTimeWatermark companyFinEventTs#137: 时间戳,间隔 2 分钟: +- 项目 [company_id#102, year#103, season#104, companyFinEventTs#137, window#124.start AS start#147, window#124.end AS end#148, min_revenue#131, max_revenue#133, mean_revenue第135话: +- 聚合 [window#138, company_id#102, year#103,quarter#104], [window#138 AS window#124, company_id#102, year#103, quad#104, min(revenue#105) ASmin_revenue#131, max(revenue#105) AS max_revenue#133, avg(cast(revenue#105 as bigint)) AS mean_revenue#135, first(eventTimeStamp#112-T60000ms, false) AS companyFinEventTs#137]: +- 过滤器 ((cast(eventTimeStamp#112-T60000ms 作为时间戳) >= window#138.start) && (cast(eventTimeStamp#112-T60000ms 作为时间戳) < window#138.end)): +- 展开 [ArrayBuffer(named_struct(start, precisiontimestampconversion((((CASE WHEN (CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) 作为 double)/cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double)))然后 (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double))) END + cast(0 as bigint)) - cast(2)as bigint)) * 20000000) + 10000000), LongType, TimestampType), end, precisiontimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTim)eStamp#112-T60000ms 作为时间戳), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType), LongType) - 10000000) 作为 double)/cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) 作为 double)/cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000)as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000) + 30000000), LongType, TimestampType)), company_id#102, year#103,quarter#104,收入#105, eventTimeStamp#112-T60000ms), ArrayBuffer(named_struct(start, precisiontimestampconversion((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000))/cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double)))+ cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000), LongType, TimestampType), end, precisiontimestampconversion((((((CASE WHEN) (cast(CEIL((cast((precisetimestampconversion(cast(cast()eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double)/cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T600))00ms 作为时间戳), TimestampType, LongType) - 10000000) 作为 double)/cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) -10000000) as double)/cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000)double)/cast(20000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000) + 30000000), LongType, TimestampType)), company_id#102, year#103,季度#104,收入#105,事件时间标记#112-T60000ms)],[窗口#138,公司ID#102,年#103,季度#104,收益#105,事件时间标记#112-T60000ms]: +- EventTimeWatermark eventTimeStamp#112: 时间戳,间隔 1 分钟:+- 项目 [company_id#102,year#103,季度#104,收入#105,eventTimeStamp#112]: +- 项目 [company_id#102, year#103, Quarter#104, Revenue#105, eventTs#106L, cast((cast(eventTs#106L as double)/cast(1000 as double)) as timestamp) AS eventTimeStamp#112]: +- 项目 [companyFin#100.company_id AS company_id#102, companyFin#100.year AS year#103, companyFin#100.quarter ASquarter#104, companyFin#100.revenue ASvenue#105, companyFin#100.eventTsAS eventTs#106L]: +- 项目 [jsontostructs(StructField(company_id,IntegerType,true), StructField(year,IntegerType,true), StructField(quarter,StringType,true), StructField(revenue,IntegerType,true), StructField(eventTs,LongType,true)), cast(value#29 as string), Some(America/New_York)) AS companyFin#100]: +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@5f935d49, kafka, Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries ->1、订阅 -> inbound_company_financials, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyFinRecordSerDe,kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5), [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@76af34b5,kafka,List(),None,List(),None,Map(includeTimestamp -> true, key.deserializer ->org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_financials, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss ->假,value.deserializer ->com.spg.ca.prescore.serde.KafkaJsonCompanyFinRecordSerDe, kafka.bootstrap.servers ->本地主机:9092,起始偏移量 ->最新,linger.ms ->5),无), kafka, [key#21, value#22, topic#23, partition#24, offset#25L, timestamp#26, timestampType#27]+- EventTimeWatermark companyInfoEventTs#81:时间戳,间隔3分钟+- 项目 [company_name#64、registeredYear#66、headQuarteredCity#67、companyid#74、companyInfoEventTs#81-T60000ms]+- 项目 [company_name#64、company_id#65、registeredYear#66、headQuarteredCity#67、companyid#74、companyInfoEventTs#81-T60000ms]+- EventTimeWatermark companyInfoEventTs#81:时间戳,间隔1分钟+- 项目 [company_name#64, company_id#65,registeredYear#66, headQuarteredCity#67, eventTs#68L, companyid#74, cast((cast(eventTs#68L as double)/cast(1000 as double)) as timestamp)AS companyInfoEventTs#81]+- 项目 [company_name#64, company_id#65,registeredYear#66, headQuarteredCity#67, eventTs#68L, cast(company_id#65 as int) AS companyid#74]+- 项目 [companyInfo#62.company_name AS company_name#64, companyInfo#62.company_id AS company_id#65, companyInfo#62.registeredYear AS registeredYear#66, companyInfo#62.headQuarteredCity AS headQuarteredCity#67, companyInfo#62.eventTs AS事件Ts#68L]+- 项目 [jsontostructs(StructField(company_name,StringType,true), StructField(company_id,IntegerType,true), StructField(registeredYear,IntegerType,true), StructField(headQuarteredCity,StringType,true), StructField(eventTs,LongType,true), cast(value#42 as string), Some(America/New_York)) AS companyInfo#62]+- 项目 [cast(value#8 as string) AS value#42, cast(topic#9 as string) AS topic#43, cast(partition#10 as int) AS partition#44, cast(offset#11L as bigint) AS offset#45L, cast(timestamp#12 as timestamp) AS timestamp#46]+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3313463c, kafka, Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, 重试 -> 1, 订阅 -> inbound_company_info, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyInfoRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@76af34b5,kafka,List(),None,List(),None,Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_info, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer->com.spg.ca.prescore.serde.KafkaJsonCompanyInfoRecordSerDe, kafka.bootstrap.servers ->本地主机:9092,起始偏移量 ->最新,linger.ms ->5),无), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]在 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)在 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)在 org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)在 org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)在 org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)在 com.spgmi.ca.prescore.utils.ConfigUtils.displayOnConsole(ConfigUtils.java:84)

处理结构化流连接的正确方法是什么?正确使用水印.大部分文档都没有多大用处,他们只是使用了一个简单易用的 Scarnio ,在现实生活场景中没有任何价值.

解决方案

AFAIK Spark 结构化流无法在聚合(或其他非类似地图的操作)后进行连接

https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries><块引用>

从 Spark 2.3 开始,您不能在连接之前使用其他非类似映射的操作.以下是一些不能使用的示例.

  • 在加入之前不能使用流式聚合.

我认为这在 Spark 2.4 中仍然适用

I am using spark-sql-2.4.1 with spark-cassandra-connector-2_11.jar

I am trying to join to streaming datasets as below :

 Dataset<Row> companyInfo_df = company_info_df
                         .select("companyInfo.*" )
                         .withColumn("companyInfoEventTs", ( col("eventTs").divide(1000) ).cast(DataTypes.TimestampType))
                         .withWatermark("companyInfoEventTs", "60 seconds");

    Dataset<Row> companyFin_df = comapany_fin_df
                         .select("companyFin.*" )
                         .withColumn("eventTimeStamp", ( col("eventTs").divide(1000) ).cast(DataTypes.TimestampType))
                         .withWatermark("eventTimeStamp", "60 seconds")
                          .groupBy( 
                                   window(col("eventTimeStamp").cast(DataTypes.TimestampType), "30 seconds", "20 seconds", "10 seconds")
                                   ,col("company_id"),col("year"),col("quarter")
                                  )
                          .agg(   
                                  min("revenue").alias("min_revenue"), 
                                    max("revenue").alias("max_revenue") , 
                                    avg("revenue").alias("mean_revenue"),
                                    first("eventTimeStamp").alias("companyFinEventTs")
                              )
                          .select("company_id","year", "quarter", "companyFinEventTs", "window.start","window.end","min_revenue", "max_revenue","mean_revenue");




     Dataset<Row> companyFinWithWatermark = companyFin_df.withWatermark("companyFinEventTs", "2 minutes");
     Dataset<Row> companyInfoWithWatermark = companyInfo_df.withWatermark("companyInfoEventTs", "3 minutes");

     Column joinExpr  = expr(" company_id = companyid AND  companyFinEventTs >= companyInfoEventTs AND companyFinEventTs <= companyInfoEventTs + interval 1 minutes ");


     Dataset<Row> companyDfAfterJoin2 = companyFinWithWatermark.join(companyInfoWithWatermark,
                                            joinExpr
                                            //,"leftOuter"
                                        )
            .withColumn("last_update_timestamp", current_timestamp())
            .withColumn( "avg_revenue", col("mean_revenue"))



     Dataset<Row> companyDfAfterJoin = companyDfAfterJoin2
                        //.withWatermark("companyFinEventTs", "60 seconds")
                        .select("company_id","company_name","year","quarter", "avg_revenue" ,"last_update_timestamp" , "companyFinEventTs");


   System.out.println(" companyDfAfterJoin *******************************");

     companyDfAfterJoin
        .writeStream()
        .format("console")
        .outputMode("append")
        .option("truncate", false)
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .start();

Any clue how it could be fixed, and whats wrong here?

Getting below error :

companyDfAfterJoin *******************************
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Project [company_id#102, company_name#64, year#103, quarter#104, avg_revenue#216, last_update_timestamp#200, companyFinEventTs#137-T120000ms]
+- Project [company_id#102, company_name#64, year#103, quarter#104, avg_revenue#216, last_update_timestamp#200, companyFinEventTs#137-T120000ms]
   +- Project [company_id#102, year#103, quarter#104, companyFinEventTs#137-T120000ms, start#147, end#148, min_revenue#131, max_revenue#133, mean_revenue#135, company_name#64, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T180000ms, last_update_timestamp#200, mean_revenue#135 AS avg_revenue#216]
      +- Project [company_id#102, year#103, quarter#104, companyFinEventTs#137-T120000ms, start#147, end#148, min_revenue#131, max_revenue#133, mean_revenue#135, company_name#64, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T180000ms, current_timestamp() AS last_update_timestamp#200]
         +- Join Inner, (((company_id#102 = companyid#74) && (companyFinEventTs#137-T120000ms >= companyInfoEventTs#81-T180000ms)) && (companyFinEventTs#137-T120000ms <= cast(companyInfoEventTs#81-T180000ms + interval 1 minutes as timestamp)))
            :- EventTimeWatermark companyFinEventTs#137: timestamp, interval 2 minutes
            :  +- Project [company_id#102, year#103, quarter#104, companyFinEventTs#137, window#124.start AS start#147, window#124.end AS end#148, min_revenue#131, max_revenue#133, mean_revenue#135]
            :     +- Aggregate [window#138, company_id#102, year#103, quarter#104], [window#138 AS window#124, company_id#102, year#103, quarter#104, min(revenue#105) AS min_revenue#131, max(revenue#105) AS max_revenue#133, avg(cast(revenue#105 as bigint)) AS mean_revenue#135, first(eventTimeStamp#112-T60000ms, false) AS companyFinEventTs#137]
            :        +- Filter ((cast(eventTimeStamp#112-T60000ms as timestamp) >= window#138.start) && (cast(eventTimeStamp#112-T60000ms as timestamp) < window#138.end))
            :           +- Expand [ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000) + 30000000), LongType, TimestampType)), company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112-T60000ms), ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000) + 30000000), LongType, TimestampType)), company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112-T60000ms)], [window#138, company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112-T60000ms]
            :              +- EventTimeWatermark eventTimeStamp#112: timestamp, interval 1 minutes
            :                 +- Project [company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112]
            :                    +- Project [company_id#102, year#103, quarter#104, revenue#105, eventTs#106L, cast((cast(eventTs#106L as double) / cast(1000 as double)) as timestamp) AS eventTimeStamp#112]
            :                       +- Project [companyFin#100.company_id AS company_id#102, companyFin#100.year AS year#103, companyFin#100.quarter AS quarter#104, companyFin#100.revenue AS revenue#105, companyFin#100.eventTs AS eventTs#106L]
            :                          +- Project [jsontostructs(StructField(company_id,IntegerType,true), StructField(year,IntegerType,true), StructField(quarter,StringType,true), StructField(revenue,IntegerType,true), StructField(eventTs,LongType,true), cast(value#29 as string), Some(America/New_York)) AS companyFin#100]
            :                             +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@5f935d49, kafka, Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_financials, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyFinRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5), [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@76af34b5,kafka,List(),None,List(),None,Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_financials, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyFinRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5),None), kafka, [key#21, value#22, topic#23, partition#24, offset#25L, timestamp#26, timestampType#27]
            +- EventTimeWatermark companyInfoEventTs#81: timestamp, interval 3 minutes
               +- Project [company_name#64, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T60000ms]
                  +- Project [company_name#64, company_id#65, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T60000ms]
                     +- EventTimeWatermark companyInfoEventTs#81: timestamp, interval 1 minutes
                        +- Project [company_name#64, company_id#65, registeredYear#66, headQuarteredCity#67, eventTs#68L, companyid#74, cast((cast(eventTs#68L as double) / cast(1000 as double)) as timestamp) AS companyInfoEventTs#81]
                           +- Project [company_name#64, company_id#65, registeredYear#66, headQuarteredCity#67, eventTs#68L, cast(company_id#65 as int) AS companyid#74]
                              +- Project [companyInfo#62.company_name AS company_name#64, companyInfo#62.company_id AS company_id#65, companyInfo#62.registeredYear AS registeredYear#66, companyInfo#62.headQuarteredCity AS headQuarteredCity#67, companyInfo#62.eventTs AS eventTs#68L]
                                 +- Project [jsontostructs(StructField(company_name,StringType,true), StructField(company_id,IntegerType,true), StructField(registeredYear,IntegerType,true), StructField(headQuarteredCity,StringType,true), StructField(eventTs,LongType,true), cast(value#42 as string), Some(America/New_York)) AS companyInfo#62]
                                    +- Project [cast(value#8 as string) AS value#42, cast(topic#9 as string) AS topic#43, cast(partition#10 as int) AS partition#44, cast(offset#11L as bigint) AS offset#45L, cast(timestamp#12 as timestamp) AS timestamp#46]
                                       +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3313463c, kafka, Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_info, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyInfoRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@76af34b5,kafka,List(),None,List(),None,Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_info, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyInfoRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
    at com.spgmi.ca.prescore.utils.ConfigUtils.displayOnConsole(ConfigUtils.java:84)

what is correct to handle strucutred streams joins ? use waterMark properly. Most of the documetation is not of much use, they use just a simple and easy scarnio , which does not hold any value in real life scenarios.

解决方案

AFAIK Spark structured streaming can't do joins after aggregations (or other non-map-like operations)

https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries

As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

  • Cannot use streaming aggregations before joins.

I think this is still true in Spark 2.4

这篇关于任何线索如何加入这个火花结构的流连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆