Spark UDAF-使用泛型作为输入类型? [英] Spark UDAF - using generics as input type?

查看:322
本文介绍了Spark UDAF-使用泛型作为输入类型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想编写Spark UDAF,其中列的类型可以是在其上定义了Scala数字的任何列。我在Internet上进行了搜索,但仅找到具体类型为 DoubleType LongType 的示例。这不可能吗?但是,然后如何将UDAF与其他数值一起使用?

I want to write Spark UDAF where type of the column could be any that has a Scala Numeric defined on it. I've searched over Internet but found only examples with concrete types like DoubleType, LongType. Isn't this possible? But how then use that UDAFs with other numeric values?

推荐答案

为简单起见,我们假设您要定义自定义 sum 。您将为输入类型提供一个 TypeTag 并使用Scala反射定义模式:

For simplicity let's assume you want to define a custom sum. You'll have provide a TypeTag for the input type and use Scala reflection to define schemas:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import scala.reflect.runtime.universe._
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor

case class MySum [T : TypeTag](implicit n: Numeric[T]) 
    extends UserDefinedAggregateFunction {

  val dt = schemaFor[T].dataType
  def inputSchema = new StructType().add("x", dt)
  def bufferSchema = new StructType().add("x", dt)

  def dataType = dt
  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = buffer.update(0,  n.zero)
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    if (!input.isNullAt(0))
      buffer.update(0, n.plus(buffer.getAs[T](0), input.getAs[T](0)))
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1.update(0, n.plus(buffer1.getAs[T](0),  buffer2.getAs[T](0)))    
  }

  def evaluate(buffer: Row) = buffer.getAs[T](0)
}

使用上面定义的函数,我们可以创建处理特定类型的实例:

With a function defined as above we can create instance handling specific types:

val sumOfLong = MySum[Long]
spark.range(10).select(sumOfLong($"id")).show



+---------+
|mysum(id)|
+---------+
|       45|
+---------+

注意

要获得与内置聚合函数相同的灵活性,您必须定义自己的 AggregateFunction ,例如 ImperativeAggregate DeclarativeAggregate 。可能,但这是一个内部API。

To get the same flexibility as the built-in aggregate functions you'd have to define your own AggregateFunction, like ImperativeAggregate or DeclarativeAggregate. It is possible, but it is an internal API.

这篇关于Spark UDAF-使用泛型作为输入类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆