如何在Spark SQL中定义和使用用户定义的聚合函数? [英] How to define and use a User-Defined Aggregate Function in Spark SQL?

查看:105
本文介绍了如何在Spark SQL中定义和使用用户定义的聚合函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道如何在Spark SQL中编写UDF:

I know how to write a UDF in Spark SQL:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)

我可以做一些类似的事情来定义一个聚合函数吗?

Can I do something similar to define an aggregate function? How is this done?

对于上下文,我想运行以下SQL查询:

For context, I want to run the following SQL query:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")

应该返回类似

行(span1,false,T0)

我想要聚合函数告诉我在 span timestamp <定义的组中是否有 opticalReceivePower 的值/ code>低于阈值。我是否需要以与上面粘贴的UDF不同的方式编写UDAF?

I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?

推荐答案

支持的方法



Spark> = 3.0

Scala UserDefinedAggregateFunction 不推荐使用( SPARK-30423 不推荐使用UserDefinedAggregateFunction )支持注册的聚合器

Scala UserDefinedAggregateFunction is being deprecated (SPARK-30423 Deprecate UserDefinedAggregateFunction) in favor of registered Aggregator.

Spark> = 2.3

矢量化udf(仅适用于Python):

Vectorized udf (Python only):

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
    ("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
    @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
    def below_threshold_(df):
        df = pd.DataFrame(
           df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
        df.reset_index(inplace=True, drop=False)
        return df

    return below_threshold_

示例用法:

df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## |    b|           true|
## |    a|          false|
## +-----+---------------+

另请参见在PySpark中的GroupedData上应用UDF(带有可运行的python示例)

Spark> = 2.0 (可选1.6,但API稍有不同):

Spark >= 2.0 (optionally 1.6 but with slightly different API):

在键入的数据集上使用聚集器

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
    with Serializable {
  def zero = false
  def reduce(acc: Boolean, x: I) = acc | f(x)
  def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
  def finish(acc: Boolean) = acc

  def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
  def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

火花> = 1.5

在Spark 1.5中,您可以像这样创建UDAF,尽管这很有可能是一种矫kill过正:

In Spark 1.5 you can create UDAF like this although it is most likely an overkill:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
    // Schema you get as an input
    def inputSchema = new StructType().add("power", IntegerType)
    // Schema of the row which is used for aggregation
    def bufferSchema = new StructType().add("ind", BooleanType)
    // Returned type
    def dataType = BooleanType
    // Self-explaining 
    def deterministic = true
    // zero value
    def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
    // Similar to seqOp in aggregate
    def update(buffer: MutableAggregationBuffer, input: Row) = {
        if (!input.isNullAt(0))
          buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
    }
    // Similar to combOp in aggregate
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))    
    }
    // Called on exit to get return value
    def evaluate(buffer: Row) = buffer.getBoolean(0)
}

示例用法:

df
  .groupBy($"group")
  .agg(belowThreshold($"power").alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Spark 1.4解决方法

我不确定我是否正确理解了您的要求,但据我所知普通的旧聚合在这里应该足够了:

I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:

val df = sc.parallelize(Seq(
    ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
  .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
  .groupBy($"group")
  .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

火花< = 1.4

据我所知,目前(火花1.4.1),除了Hive之外,没有对UDAF的支持。借助Spark 1.5应该可以(请参见 SPARK-3947 )。

As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).

Spark内部使用许多类,包括 ImperativeAggregates DeclarativeAggregates

Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.

仅供内部使用,如有更改,恕不另行通知,因此您可能不想在生产代码中使用它,而只是为了完整性<$ c可以像这样实现$ c> BelowThreshold 和 DeclarativeAggregate (已通过Spark 2.2-SNAPSHOT测试):

There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):

import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression) 
    extends  DeclarativeAggregate  {
  override def children: Seq[Expression] = Seq(child, threshold)

  override def nullable: Boolean = false
  override def dataType: DataType = BooleanType

  private lazy val belowThreshold = AttributeReference(
    "belowThreshold", BooleanType, nullable = false
  )()

  // Used to derive schema
  override lazy val aggBufferAttributes = belowThreshold :: Nil

  override lazy val initialValues = Seq(
    Literal(false)
  )

  override lazy val updateExpressions = Seq(Or(
    belowThreshold,
    If(IsNull(child), Literal(false), LessThan(child, threshold))
  ))

  override lazy val mergeExpressions = Seq(
    Or(belowThreshold.left, belowThreshold.right)
  )

  override lazy val evaluateExpression = belowThreshold
  override def defaultResult: Option[Literal] = Option(Literal(false))
} 

应该进一步包装等效于 withAggregateFunction

It should be further wrapped with an equivalent of withAggregateFunction.

这篇关于如何在Spark SQL中定义和使用用户定义的聚合函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆