如何透视流数据集? [英] How to pivot streaming dataset?

查看:73
本文介绍了如何透视流数据集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图透视一个Spark流数据集(结构化流),但是却得到一个AnalysisException(以下摘录).

I am trying to pivot a Spark streaming dataset (structured streaming) but I get an AnalysisException (excerpt below).

有人可以确认结构化流(Spark 2.0)中确实不支持数据透视吗,也许建议其他方法?

Could someone confirm pivoting is indeed not supported in structured streams (Spark 2.0), perhaps suggest alternative approaches?

线程主"中的异常org.apache.spark.sql.AnalysisException:具有流源的查询必须使用writeStream.start();执行; 卡夫卡 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ catalyst $ analysis $ UnsupportedOperationChecker $$ throwError(UnsupportedOperationChecker.scala:297) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply(UnsupportedOperationChecker.scala:36) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply(UnsupportedOperationChecker.scala:34) 在org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

推荐答案

tl; dr pivot直到 2.4.4(含)以下的Spark结构化流都不直接支持聚合./strong>.

tl;dr pivot aggregation is not directly supported by Spark Structured Streaming up to and including 2.4.4.

作为一种解决方法,请使用

As a workaround, use DataStreamWriter.foreachBatch or more generic DataStreamWriter.foreach.

我现在使用的是最新版本的Spark 2.4.4.

I use the latest version of Spark 2.4.4 as of now.

scala> spark.version
res0: String = 2.4.4

UnsupportedOperationChecker (您可以在堆栈跟踪中找到)检查流查询(的逻辑计划)是否仅使用受支持的操作.

UnsupportedOperationChecker (that you can find in the stack trace) checks whether (the logical plan of) a streaming query uses supported operations only.

执行pivot时,您首先必须groupBy,因为这是使pivot可用的唯一界面.

When you execute pivot you had to groupBy first as that's the only interface to give you pivot available.

pivot有两个问题:

  1. pivot想知道要为多少列生成值,因此collect会这样做,这对于流数据集是不可能的.

  1. pivot wants to know how many columns to generate values for and hence does collect which is not possible with streaming Datasets.

pivot实际上是Spark结构化流不支持的另一种聚合(在groupBy之外)

pivot is actually another aggregation (beside groupBy) that Spark Structured Streaming does not support

让我们看一看问题1,其中没有要定义的列.

Let's look at the issue 1 with no columns to pivot on defined.

val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp") // <-- pivot with no values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
  at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
  ... 49 elided

最后两行显示了该问题,即pivot

The last two lines show the issue, i.e. pivot does collect under the covers and hence the issue.

另一个问题是,即使您要指定要绕过的列的值,但由于流式传输不是

The other issue is that even though you'd specify the values for columns to pivot on you'd then get the other issue due to multiple aggregations (and you can see that it's actually a check for streaming not batch as has happened with the first case).

val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp", Seq(1)) // <-- pivot with explicit values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
   +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
      +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
  ... 49 elided

这篇关于如何透视流数据集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆