结构化汽蒸组通过agg collect_list收集不能在部分聚合中使用 [英] Structured steaming groupBy agg collect_list Collect cannot be used in partial aggregations

查看:244
本文介绍了结构化汽蒸组通过agg collect_list收集不能在部分聚合中使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

stateDF
.withWatermark("t","1 seconds")
.groupBy(window($"t","1 minutes","1 minutes"),$"hid")
.agg(collect_list("id"))
.writeStream.outputMode("append")
.format("console").trigger(ProcessingTime("1 minutes"))
.start().awaitTermination()

当我添加"collect_list"时,我会遇到这个问题. 但是通过星火核心可以做到. 错误:

When I add 'collect_list', I'll have this problem. But by spark core it can be done. ERROR:

java.lang.RuntimeException:收集不能在部分聚合中使用. 在scala.sys.package $ .error(package.scala:27) 在...

java.lang.RuntimeException: Collect cannot be used in partial aggregations. at scala.sys.package$.error(package.scala:27) at ...

java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624) 在java.lang.Thread.run(Thread.java:748) 在此处输入代码

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) enter code here

推荐答案

通过我的探索,我对这个问题有两种解决方法.

Through my exploration,I have two solutions to this problem.

方法1:使用 SPARK-1893 修改源代码,但我不知道不建议这样做.

Methed 1:Modifying the source code with SPARK-1893,But I don't recommend doing this.

方法2:自己创建用户定义的聚合函数(UDAF),虽然很麻烦,但是很有效.以下是我的代码,欢迎正确!

Methed 2:Making user-defined aggregate functions (UDAF) for yourself.Although this is troublesome, it is effective.The following is my code, welcome correct!

class CollectList extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(StructField("id", StringType, nullable = true) :: StructField("state", StringType, nullable = true):: Nil)

  override def bufferSchema: StructType = StructType(StructField("ids", ArrayType(StringType, containsNull = true), nullable = true) :: Nil)

  override def dataType: ArrayType = ArrayType(StringType, containsNull = true)

  override def deterministic: Boolean = false

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = null
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.get(0) == null){
      buffer(0) = Array(input.getString(0) + "_" + input.getString(1))
    }
    else {
      val s = input.getString(0) + "_" + input.getString(1)
      val b = buffer.getAs[Seq[String]](0)
      buffer(0) = b :+ s
    }

  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    if (buffer1.getAs[Seq[String]](0) == null){
      buffer1(0) = buffer2.getAs[Seq[String]](0).distinct
    }
    else {
      buffer1(0) = (buffer1.getAs[Seq[String]](0) ++ buffer2.getAs[Seq[String]](0)).distinct
    }
  }

  override def evaluate(buffer: Row): Any = buffer.getAs[Seq[String]](0)

}

这篇关于结构化汽蒸组通过agg collect_list收集不能在部分聚合中使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆