Spark - 流式数据帧/数据集不支持非基于时间的窗口; [英] Spark - Non-time-based windows are not supported on streaming DataFrames/Datasets;
问题描述
我需要使用内部选择和分区来编写 Spark sql 查询.问题是我有 AnalysisException.我已经在这上面花了几个小时,但使用其他方法我没有成功.
异常:
线程main"org.apache.spark.sql.AnalysisException 中的异常:流式数据帧/数据集不支持非基于时间的窗口;;窗口 [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30ASC NULLS 首先]+- 项目 [currentTemperature#27,deviceId#28,status#29,timestamp#30,wantedTemperature#31,CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]
我认为这太复杂了,无法像这样实现.但我不知道要修复它.
SparkSession spark = SparkUtils.getSparkSession("RawModel");数据集<RawModel>datasetMap = readFromKafka(spark);datasetMap.registerTempTable("test");数据集<行>res = datasetMap.sqlContext().sql("" +" 选择 deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime , max(timestamp) as maxTime, count(*) as countFrame " +" from (select test.*, sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +来自测试" +" ) 测试" +" group by deviceid, grp ");
任何建议将不胜感激.谢谢.
我认为问题出在 windowing 规范中:
over (partition by deviceId order by timestamp)
partition 需要超过基于时间的列 - 在您的情况下 timestamp .以下应该有效:
over(按时间戳顺序按时间戳分区)
这当然不会解决您查询的意图.可能会尝试以下操作:但尚不清楚 spark 是否支持它:
over(按时间戳分区,deviceId 按时间戳排序)
即使 spark 确实支持它仍然会改变您查询的语义.
更新
这是一个权威来源:来自 Tathagata Das,他是 spark 流 的关键/核心提交者:
I need to write Spark sql query with inner select and partition by. Problem is that I have AnalysisException. I already spend few hours on this but with other approach I have no success.
Exception:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30 ASC NULLS FIRST]
+- Project [currentTemperature#27, deviceId#28, status#29, timestamp#30, wantedTemperature#31, CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]
I assume that this is too complicated query to implement like this. But i don't know to to fix it.
SparkSession spark = SparkUtils.getSparkSession("RawModel");
Dataset<RawModel> datasetMap = readFromKafka(spark);
datasetMap.registerTempTable("test");
Dataset<Row> res = datasetMap.sqlContext().sql("" +
" select deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime ,max(timestamp) as maxTime, count(*) as countFrame " +
" from (select test.*, sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +
" from test " +
" ) test " +
" group by deviceid, grp ");
Any suggestion would be very appreciated. Thank you.
I believe the issue is in the windowing specification:
over (partition by deviceId order by timestamp)
The partition would need to be over a time based column - in your case timestamp . The following should work:
over (partition by timestamp order by timestamp)
That will not of course address the intent of your query. The following might be attempted: but it is unclear whether spark would support it:
over (partition by timestamp, deviceId order by timestamp)
Even if spark does support that it would still change the semantics of your query.
Update
Here is a definitive source: from Tathagata Das who is a key/core committer on spark streaming: http://apache-spark-user-list.1001560.n3.nabble.com/Does-partition-by-and-order-by-works-only-in-stateful-case-td31816.html
这篇关于Spark - 流式数据帧/数据集不支持非基于时间的窗口;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!