如何以CSV格式编写窗口聚合? [英] How to write windowed aggregation in CSV format?

查看:18
本文介绍了如何以CSV格式编写窗口聚合?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个 Spark Structured Streaming 应用程序,它流式传输 csv 文件并将它们与静态数据连接起来.加入后我做了一些聚合.

I am developing a Spark Structured Streaming application that streams csv files and joins them with a static data. I have done some aggregation after join.

在将查询结果以 CSV 格式写入 HDFS 时,出现以下错误:

While writing the query result to HDFS in CSV format, I am getting the following error:

19/01/09 14:00:30 ERROR MicroBatchExecution: Query [id = 830ca987-b55a-4c03-aa13-f71bc57e47ad, runId = 87cdb029-0022-4f1c-b55e-c2443c9f058a] terminated with error java.lang.UnsupportedOperationException: CSV data source does not support struct<start:timestamp,end:timestamp> data type.
    at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType$1(CSVUtils.scala:127)
    at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131)
    at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131)

根本原因可能是什么?

以下是我的代码的相关部分:

Here are the relevant parts of my code:

val spark = SparkSession
  .builder
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
  .getOrCreate

...

val df_agg_without_time = sqlResultjoin
  .withWatermark("event_time", "10 seconds")
  .groupBy(
    window($"event_time", "10 seconds", "5 seconds"),
    $"section",
    $"timestamp")
  .agg(sum($"total") as "total")

...

finalTable_repo
  .writeStream
  .outputMode("append")
  .partitionBy("xml_data_dt")
  .format("csv")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .option("path", "hdfs://op/apps/hive/warehouse/area.db/finalTable_repo")
  .start

推荐答案

进行聚合的行 .groupBy(window($"event_time", "10 seconds", "5 seconds"), $"section", $"timestamp") 创建 CSV 数据源不支持的 struct 数据类型.

The line where you do aggregation .groupBy(window($"event_time", "10 seconds", "5 seconds"), $"section", $"timestamp") creates the struct<start:timestamp,end:timestamp> data type that is not supported by the CSV data source.

只需 df_agg_without_time.printSchema,您就会看到该列.

Just df_agg_without_time.printSchema and you see the column.

一个解决方案是简单地将其转换为其他更简单的类型(可能使用 selectwithColumn)或者只是 select 它(即不包括在以下数据框中).

A solution is simply to transform it to some other simpler type (possibly with select or withColumn) or just select it out (i.e. not include in the following dataframe).

以下是一个示例批处理(非流式)结构化查询,其中显示了流式结构化查询使用的架构(当您创建 df_agg_without_time 时).

The following is a sample batch (non-streaming) structured query that shows the schema that your streaming structured query uses (when you create df_agg_without_time).

val q = spark
  .range(4)
  .withColumn("t", current_timestamp)
  .groupBy(window($"t", "10 seconds"))
  .count
scala> q.printSchema
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

对于示例流查询,您可以使用费率数据源.

For a sample streaming query, you could use the rate data source.

val q = spark
  .readStream
  .format("rate")
  .load
  .groupBy(window($"timestamp", "10 seconds"))
  .count
scala> q.printSchema
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

这篇关于如何以CSV格式编写窗口聚合?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆