使用带水印的附加输出模式时的结构化流异常 [英] Structured Streaming exception when using append output mode with watermark
问题描述
尽管我正在使用 withWatermark()
,但是当我运行我的spark工作时,我收到以下错误消息:
Despite the fact that I'm using withWatermark()
, I'm getting the following error message when I run my spark job:
线程main中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames / DataSets上有流式聚合时,不支持追加输出模式;;
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
从我在编程指南,这与预期用法(和示例代码)完全匹配。有谁知道可能出错了什么?
From what I can see in the programming guide, this exactly matches the intended usage (and the example code). Does anyone know what might be wrong?
提前致谢!
相关代码(Java 8,Spark 2.2) .0):
Relevant Code (Java 8, Spark 2.2.0):
StructType logSchema = new StructType()
.add("timestamp", TimestampType)
.add("key", IntegerType)
.add("val", IntegerType);
Dataset<Row> kafka = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
Dataset<Row> parsed = kafka
.select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
.select("parsed_value.*");
Dataset<Row> tenSecondCounts = parsed
.withWatermark("timestamp", "10 minutes")
.groupBy(
parsed.col("key"),
window(parsed.col("timestamp"), "1 day"))
.count();
StreamingQuery query = tenSecondCounts
.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.option("truncate", false)
.start();
推荐答案
问题出在解析.COL
。用 col
替换它将解决问题。我建议总是使用 col
函数而不是 Dataset.col
。
The problem is in parsed.col
. Replacing it with col
will fix the issue. I would suggest always using col
function instead of Dataset.col
.
Dataset.col
返回已解决的列
,而 col
返回未解析的列
。
parsed.withWatermark(timestamp,10分钟)
将使用具有相同名称的新列创建新的数据集。水印信息附加在新数据集中的时间戳
列,而不是 parsed.col(timestamp)
,所以 groupBy
中的列没有水印。
parsed.withWatermark("timestamp", "10 minutes")
will create a new Dataset with new columns with the same names. The watermark information is attached the timestamp
column in the new Dataset, not parsed.col("timestamp")
, so the columns in groupBy
don't have watermark.
当您使用未解析的列时,Spark会找出正确的你的专栏。
When you use unresolved columns, Spark will figure out the correct columns for you.
这篇关于使用带水印的附加输出模式时的结构化流异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!