如何在Spark中使用Scala中的countDistinct? [英] How to use countDistinct in Scala with Spark?

查看:965
本文介绍了如何在Spark中使用Scala中的countDistinct?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用 countDistinct 函数,根据

I've tried to use countDistinct function which should be available in Spark 1.5 according to DataBrick's blog. However, I got the following exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function countDistinct;

我在火花开发人员的邮件列表,他们建议使用 count distinct 函数来获取应由 countDistinct产生的相同结果:

I've found that on Spark developers' mail list they suggest using count and distinct functions to get the same result which should be produced by countDistinct:

count(distinct <columnName>)
// Instead
countDistinct(<columnName>)

因为我是从聚合函数的名称列表中动态构建聚合表达式的,所以我希望没有任何需要不同对待的特殊情况.

Because I build aggregation expressions dynamically from the list of the names of aggregation functions I'd prefer to don't have any special cases which require different treating.

因此,是否可以通过以下方式将其统一:

So, is it possible to unify it by:

  • 注册新的UDAF,它将作为 count(distinct columnName)
  • 的别名
  • 已在Spark CountDistinct函数中手动实现注册,该函数可能是以下导入中的一个:

  • registering new UDAF which will be an alias for count(distinct columnName)
  • registering manually already implemented in Spark CountDistinct function which is probably one from following import:

导入org.apache.spark.sql.catalyst.expressions.{CountDistinctFunction,CountDistinct}

import org.apache.spark.sql.catalyst.expressions.{CountDistinctFunction, CountDistinct}

还是以其他任何方式进行?

or do it in any other way?

示例(删除了一些本地引用和不必要的代码):

Example (with removed some local references and unnecessary code):

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, SQLContext, DataFrame}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer


class Flattener(sc: SparkContext) {
  val sqlContext = new SQLContext(sc)

  def flatTable(data: DataFrame, groupField: String): DataFrame = {
    val flatteningExpressions = data.columns.zip(TypeRecognizer.getTypes(data)).
      flatMap(x => getFlatteningExpressions(x._1, x._2)).toList

    data.groupBy(groupField).agg (
      expr(s"count($groupField) as groupSize"),
      flatteningExpressions:_*
    )
  }

  private def getFlatteningExpressions(fieldName: String, fieldType: DType): List[Column] = {
    val aggFuncs = getAggregationFunctons(fieldType)

    aggFuncs.map(f => expr(s"$f($fieldName) as ${fieldName}_$f"))
  }

  private def getAggregationFunctons(fieldType: DType): List[String] = {
    val aggFuncs = new ListBuffer[String]()

    if(fieldType == DType.NUMERIC) {
      aggFuncs += ("avg", "min", "max")
    }

    if(fieldType == DType.CATEGORY) {
      aggFuncs += "countDistinct"
    }

    aggFuncs.toList
  }

}

推荐答案

countDistinct可以以两种不同的形式使用:

countDistinct can be used in two different forms:

df.groupBy("A").agg(expr("count(distinct B)")

df.groupBy("A").agg(countDistinct("B"))

但是,当您想在自定义UDAF(在Spark 1.5中实现为UserDefinedAggregateFunction)的同一列上使用这些方法时,这两种方法都不起作用:

However, neither of these methods work when you want to use them on the same column with your custom UDAF (implemented as UserDefinedAggregateFunction in Spark 1.5):

// Assume that we have already implemented and registered StdDev UDAF 
df.groupBy("A").agg(countDistinct("B"), expr("StdDev(B)"))

// Will cause
Exception in thread "main" org.apache.spark.sql.AnalysisException: StdDev is implemented based on the new Aggregate Function interface and it cannot be used with functions implemented based on the old Aggregate Function interface.;

由于这些限制,看来最合理的做法是将countDistinct实现为UDAF,这应该允许以相同的方式对待所有功能,以及与其他UDAF一起使用countDistinct.

Due to these limitation it looks that the most reasonable is implementing countDistinct as a UDAF what should allow to treat all functions in the same way as well as use countDistinct along with other UDAFs.

示例实现如下所示:

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

class CountDistinct extends UserDefinedAggregateFunction{
  override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = (buffer.getSeq[String](0).toSet + input.getString(0)).toSeq
  }

  override def bufferSchema: StructType = StructType(
      StructField("items", ArrayType(StringType, true)) :: Nil
  )

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = (buffer1.getSeq[String](0).toSet ++ buffer2.getSeq[String](0).toSet).toSeq
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Seq[String]()
  }

  override def deterministic: Boolean = true

  override def evaluate(buffer: Row): Any = {
    buffer.getSeq[String](0).length
  }

  override def dataType: DataType = IntegerType
}

这篇关于如何在Spark中使用Scala中的countDistinct?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆