如何在 Spark SQL 中定义和使用用户定义的聚合函数? [英] How to define and use a User-Defined Aggregate Function in Spark SQL?

查看:33
本文介绍了如何在 Spark SQL 中定义和使用用户定义的聚合函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道如何在 Spark SQL 中编写 UDF:

def underThreshold(power: Int): Boolean = {返回功率<-40}sqlContext.udf.register("低于阈值",低于阈值_)

我可以做一些类似的事情来定义一个聚合函数吗?这是怎么做的?

对于上下文,我想运行以下 SQL 查询:

val aggDF = sqlContext.sql("""SELECT span, underThreshold(opticalReceivePower), timestamp来自 ifDF哪里光接收功率不为空GROUP BY 跨度、时间戳按跨度排序""")

它应该返回类似

Row(span1, false, T0)

我希望聚合函数告诉我在由 spantimestamp 定义的组中是否有任何 opticalReceivePower 值,它们低于临界点.我的 UDAF 是否需要与上面粘贴的 UDF 不同?

解决方案

支持的方法

Spark >= 3.0

Scala UserDefinedAggregateFunction 已被弃用 (SPARK-30423 弃用 UserDefinedAggregateFunction) 以支持已注册的 Aggregator.

Spark >= 2.3

矢量化 udf(仅限 Python):

from pyspark.sql.functions import pandas_udf从 pyspark.sql.functions 导入 PandasUDFType从 pyspark.sql.types 导入 *将熊猫导入为 pddf = sc.parallelize([("a", 0), ("a", 1), ("b", 30), ("b", -50)]).toDF(["组", "功率"])def below_threshold(threshold, group="group", power="power"):@pandas_udf("struct<group: string, under_threshold: boolean>", PandasUDFType.GROUPED_MAP)def below_threshold_(df):df = pd.DataFrame(df.groupby(group).apply(lambda x: (x[power] <阈值).any()))df.reset_index(就地=真,下降=假)返回 df返回低于_阈值_

示例用法:

df.groupBy("group").apply(below_threshold(-40)).show()## +-----+---------------+## |组|低于阈值|## +-----+-------------+## |乙|真|## |一个|假|## +-----+-------------+

另请参阅在 PySpark 中的 GroupedData 上应用 UDF(带有运行的 Python 示例)

Spark >= 2.0(可选 1.6,但 API 略有不同):

可以在类型化的 Datasets 上使用 Aggregators:

import org.apache.spark.sql.expressions.Aggregator导入 org.apache.spark.sql.{Encoder, Encoders}class belowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]带有可序列化的 {定义零 = 假def reduce(acc: Boolean, x: I) = acc |f(x)def merge(acc1: Boolean, acc2: Boolean) = acc1 |ACC2def Finish(acc: Boolean) = accdef bufferEncoder: Encoder[Boolean] = Encoders.scalaBooleandef outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean}值低于阈值 = 新低于阈值 [(String, Int)](_._2 < - 40).toColumndf.as[(String, Int)].groupByKey(_._1).agg(低于阈值)

Spark >= 1.5:

在 Spark 1.5 中,您可以像这样创建 UDAF,尽管这很可能是一种矫枉过正:

import org.apache.spark.sql.expressions._导入 org.apache.spark.sql.types._导入 org.apache.spark.sql.Row低于阈值的对象扩展了 UserDefinedAggregateFunction {//作为输入获得的架构def inputSchema = new StructType().add("power", IntegerType)//用于聚合的行的模式def bufferSchema = new StructType().add("ind", BooleanType)//返回类型def 数据类型 = BooleanType//不言自明def确定性=真//零值def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)//总体上类似于 seqOpdef更新(缓冲区:MutableAggregationBuffer,输入:行)= {如果 (!input.isNullAt(0))buffer.update(0, buffer.getBoolean(0) | input.getInt(0) <-40)}//聚合类似于combOpdef合并(缓冲区1:MutableAggregationBuffer,缓冲区2:行)= {buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))}//退出时调用以获取返回值def 评估(缓冲区:行)= buffer.getBoolean(0)}

示例用法:

df.groupBy($"group").agg(belowThreshold($"power").alias("belowThreshold")).展示//+-----+--------------+//|组|低于阈值|//+-----+--------------+//|一个|假|//|乙|真|//+-----+--------------+

Spark 1.4 解决方法:

我不确定我是否正确理解您的要求,但据我所知,简单的旧聚合在这里应该足够了:

val df = sc.parallelize(Seq(("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")df.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType)).groupBy($"group").agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold")).展示//+-----+--------------+//|组|低于阈值|//+-----+--------------+//|一个|假|//|乙|真|//+-----+--------------+

Spark <= 1.4:

据我所知,目前(Spark 1.4.1),除了 Hive 之外,不支持 UDAF.Spark 1.5 应该可以实现(参见 SPARK-3947).>

不受支持的/内部方法

Spark 内部使用了许多类,包括 ImperativeAggregates引用一>.

仅供内部使用,可能会更改,恕不另行通知,因此您可能不想在生产代码中使用它,而只是为了完整性 BelowThresholdDeclarativeAggregate 可以这样实现(用 Spark 2.2-SNAPSHOT 测试):

import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate导入 org.apache.spark.sql.catalyst.expressions._导入 org.apache.spark.sql.types._案例类低于阈值(子:表达式,阈值:表达式)扩展声明聚合{覆盖 def 子项:Seq[Expression] = Seq(child, threshold)覆盖 def 可为空的:Boolean = false覆盖定义数据类型:数据类型 = BooleanType低于阈值的私有惰性值 = AttributeReference(低于阈值",布尔类型,可为空 = false)()//用于派生模式覆盖惰性 val aggBufferAttributes = belowThreshold :: Nil覆盖惰性 val initialValues = Seq(文字(假))覆盖惰性 val updateExpressions = Seq(Or(低于阈值,If(IsNull(child), Literal(false), LessThan(child, threshold))))覆盖惰性 val mergeExpressions = Seq(或(低于Threshold.left,低于Threshold.right))覆盖惰性 val 评估表达式 = 低于阈值覆盖 def defaultResult: Option[Literal] = Option(Literal(false))}

它应该进一步包裹在 withAggregateFunction.

I know how to write a UDF in Spark SQL:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)

Can I do something similar to define an aggregate function? How is this done?

For context, I want to run the following SQL query:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")

It should return something like

Row(span1, false, T0)

I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?

解决方案

Supported methods

Spark >= 3.0

Scala UserDefinedAggregateFunction is being deprecated (SPARK-30423 Deprecate UserDefinedAggregateFunction) in favor of registered Aggregator.

Spark >= 2.3

Vectorized udf (Python only):

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
    ("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
    @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
    def below_threshold_(df):
        df = pd.DataFrame(
           df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
        df.reset_index(inplace=True, drop=False)
        return df

    return below_threshold_

Example usage:

df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## |    b|           true|
## |    a|          false|
## +-----+---------------+

See also Applying UDFs on GroupedData in PySpark (with functioning python example)

Spark >= 2.0 (optionally 1.6 but with slightly different API):

It is possible to use Aggregators on typed Datasets:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
    with Serializable {
  def zero = false
  def reduce(acc: Boolean, x: I) = acc | f(x)
  def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
  def finish(acc: Boolean) = acc

  def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
  def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

Spark >= 1.5:

In Spark 1.5 you can create UDAF like this although it is most likely an overkill:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
    // Schema you get as an input
    def inputSchema = new StructType().add("power", IntegerType)
    // Schema of the row which is used for aggregation
    def bufferSchema = new StructType().add("ind", BooleanType)
    // Returned type
    def dataType = BooleanType
    // Self-explaining 
    def deterministic = true
    // zero value
    def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
    // Similar to seqOp in aggregate
    def update(buffer: MutableAggregationBuffer, input: Row) = {
        if (!input.isNullAt(0))
          buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
    }
    // Similar to combOp in aggregate
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))    
    }
    // Called on exit to get return value
    def evaluate(buffer: Row) = buffer.getBoolean(0)
}

Example usage:

df
  .groupBy($"group")
  .agg(belowThreshold($"power").alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Spark 1.4 workaround:

I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:

val df = sc.parallelize(Seq(
    ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
  .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
  .groupBy($"group")
  .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Spark <= 1.4:

As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).

Unsupported / internal methods

Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.

There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):

import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression) 
    extends  DeclarativeAggregate  {
  override def children: Seq[Expression] = Seq(child, threshold)

  override def nullable: Boolean = false
  override def dataType: DataType = BooleanType

  private lazy val belowThreshold = AttributeReference(
    "belowThreshold", BooleanType, nullable = false
  )()

  // Used to derive schema
  override lazy val aggBufferAttributes = belowThreshold :: Nil

  override lazy val initialValues = Seq(
    Literal(false)
  )

  override lazy val updateExpressions = Seq(Or(
    belowThreshold,
    If(IsNull(child), Literal(false), LessThan(child, threshold))
  ))

  override lazy val mergeExpressions = Seq(
    Or(belowThreshold.left, belowThreshold.right)
  )

  override lazy val evaluateExpression = belowThreshold
  override def defaultResult: Option[Literal] = Option(Literal(false))
} 

It should be further wrapped with an equivalent of withAggregateFunction.

这篇关于如何在 Spark SQL 中定义和使用用户定义的聚合函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆