使用带水印的附加输出模式时的结构化流异常 [英] Structured Streaming exception when using append output mode with watermark

查看:499
本文介绍了使用带水印的附加输出模式时的结构化流异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尽管我正在使用 withWatermark(),但是当我运行我的spark工作时,我收到以下错误消息:

Despite the fact that I'm using withWatermark(), I'm getting the following error message when I run my spark job:


线程main中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames / DataSets上有流式聚合时,不支持追加输出模式;;

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

从我在编程指南,这与预期用法(和示例代码)完全匹配。有谁知道可能出错了什么?

From what I can see in the programming guide, this exactly matches the intended usage (and the example code). Does anyone know what might be wrong?

提前致谢!

相关代码(Java 8,Spark 2.2) .0):

Relevant Code (Java 8, Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();


推荐答案

问题出在解析.COL 。用 col 替换它将解决问题。我建议总是使用 col 函数而不是 Dataset.col

The problem is in parsed.col. Replacing it with col will fix the issue. I would suggest always using col function instead of Dataset.col.

Dataset.col 返回已解决的列,而 col 返回未解析的列

parsed.withWatermark(timestamp,10分钟)将使用具有相同名称的新列创建新的数据集。水印信息附加在新数据集中的时间戳列,而不是 parsed.col(timestamp),所以 groupBy 中的列没有水印。

parsed.withWatermark("timestamp", "10 minutes") will create a new Dataset with new columns with the same names. The watermark information is attached the timestamp column in the new Dataset, not parsed.col("timestamp"), so the columns in groupBy don't have watermark.

当您使用未解析的列时,Spark会找出正确的你的专栏。

When you use unresolved columns, Spark will figure out the correct columns for you.

这篇关于使用带水印的附加输出模式时的结构化流异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆