Spark中具有多个输入列的UDAF [英] UDAF in Spark with multiple input columns

查看:185
本文介绍了Spark中具有多个输入列的UDAF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试开发一个用户定义的聚合函数,该函数可以计算一行数字的线性回归.我已经成功地完成了UDAF,该UDAF可以计算均值的置信区间(包括大量的反复试验和SO!).

I am trying to develop a user defined aggregate function that computes a linear regression on a row of numbers. I have successfully done a UDAF that calculates confidence intervals of means (with a lot trial and error and SO!).

这实际上是为我运行的东西:

Here's what actually runs for me already:

import org.apache.spark.sql._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{StructType, StructField, DoubleType, LongType, DataType, ArrayType}

case class RegressionData(intercept: Double, slope: Double)

class Regression  {

  import org.apache.commons.math3.stat.regression.SimpleRegression

  def roundAt(p: Int)(n: Double): Double = { val s = math pow (10, p); (math round n * s) / s }

  def getRegression(data: List[Long]): RegressionData = {
    val regression: SimpleRegression  = new SimpleRegression()
    data.view.zipWithIndex.foreach { d =>
        regression.addData(d._2.toDouble, d._1.toDouble)
    }

    RegressionData(roundAt(3)(regression.getIntercept()), roundAt(3)(regression.getSlope()))
  }
}


class UDAFRegression extends UserDefinedAggregateFunction {

  import java.util.ArrayList

  def deterministic = true

  def inputSchema: StructType =
    new StructType().add("units", LongType)

  def bufferSchema: StructType =
    new StructType().add("buff", ArrayType(LongType))


  def dataType: DataType =
    new StructType()
      .add("intercept", DoubleType)
      .add("slope", DoubleType)

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer.update(0, new ArrayList[Long]())
  }

  def update(buffer: MutableAggregationBuffer, input: Row) = {
    val longList: ArrayList[Long]  = new ArrayList[Long](buffer.getList(0))
    longList.add(input.getLong(0));
    buffer.update(0, longList);

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    val longList: ArrayList[Long] = new ArrayList[Long](buffer1.getList(0))
    longList.addAll(buffer2.getList(0))

    buffer1.update(0, longList)
  }


  def evaluate(buffer: Row) = {
    import scala.collection.JavaConverters._
    val list = buffer.getList(0).asScala.toList
    val regression = new Regression
    regression.getRegression(list)
  }
}

但是,数据集未按顺序排列,这在这里显然非常重要.因此,我需要第二个参数regression($"longValue", $"created_day")代替regression($"longValue"). created_daysql.types.DateType.

However the datasets do not come in order, which is obviously very important here. Hence instead of regression($"longValue") I need to a second param regression($"longValue", $"created_day"). created_day is a sql.types.DateType.

我对DataType,StructType和诸如此类的东西感到很困惑,由于网络上缺少示例,我在这里遇到了试用和订购尝试.

I am pretty confused by DataTypes, StructTypes and what-not and due to the lack of examples on the web, I got stuck w/ my trial and order attempts here.

在我的情况下,这些StructTypes是否开销很大? (可变的)Map不会做吗? MapType实际上是不可变的,并且作为缓冲区类型不是很毫无意义吗?

Are those StructTypes overhead in my case? Wouldn't a (mutable) Map just do? Is MapType actually immutable and isn't this rather pointless to be a buffer type?

这是否必须与我在update()中通过input.getLong(0)检索到的类型相匹配?

Does this have to match the type I retrieve in update() via in my case input.getLong(0)?

我已经看过buffer.update(0, 0.0)(很明显,当它包含Doubles时),buffer(0) = new WhatEver(),我想甚至是buffer = Nil.这些有什么不同吗?

I have seen buffer.update(0, 0.0) (when it contains Doubles, obviously), buffer(0) = new WhatEver() and I think even buffer = Nil. Does any of these make a difference?

以上示例似乎过于复杂.我期望能够做某事.就像buffer += input.getLong(0) -> input.getDate(1). 我可以期望以这种方式访问​​输入吗?

The example above seems over complicated. I was expecting to be able to do sth. like buffer += input.getLong(0) -> input.getDate(1). Can I expect to access the input this way

我可以将功能块留空吗 def merge(…) = {}?

Can I just leave the function block empty like def merge(…) = {}?

evaluate()中对该缓冲区进行排序的挑战是很多.我应该能够弄清楚,尽管我仍然对你们如何做到这一点的最优雅的方式感兴趣(只需一小部分时间).

The challenge to sort that buffer in evaluate() is sth. I should be able to figure out, although I am still interested in the most elegant ways of how you guys do this (in a fraction of the time).

我返回一个案例类,而不是dataType中定义的StructType,这似乎不是问题.还是因为它恰好与我的案例类匹配而起作用?

I return a case class, not the StructType as defined in dataType which does not seem to be an issue. Or is it working since it happens to match my case class?

推荐答案

也许这可以清除一切.

UDAF APIsDataFrame Columns上工作.就像DataFrame中的所有其他Columns一样,您正在做的所有事情都必须进行序列化.如您所述,唯一支持MapType是不可变的,因为这是您可以放入Column的唯一内容.对于不可变的集合,您只需创建一个新集合,其中包含旧集合以及一个值:

The UDAF APIs work on DataFrame Columns. Everything you are doing has to get serialized just like all the other Columns in the DataFrame. As you note, the only support MapType is immutable, because this is the only thing you can put in a Column. With immutable collections, you just create a new collection that contains the old collection plus a value:

var map = Map[Long,Long]()
map = map + (0L -> 1234L)
map = map + (1L -> 4567L)

是的,就像使用任何DataFrame一样,您的类型必须匹配.当确实存在LongType问题时,请执行buffer.getInt(0).

Yes, just like working with any DataFrame, your types have to match. Do buffer.getInt(0) when there's really a LongType there is going to be a problem.

没有复位缓冲区的标准方法,因为除了对您的数据类型/用例有意义的方法以外,其他方法也没有.也许零实际上是上个月的平衡.零可能是另一个数据集的运行平均值;可能零是null或空字符串,或者零实际上是零.

There's no standard way to reset the buffer because other than whatever makes sense for your data type / use case. Maybe zero is actually last month's balanace; maybe zero is a running average from another dataset; maybe zero is an null or an empty string or maybe zero is really zero.

merge是仅在某些情况下发生的优化-一种归类为SQL优化可以在情况允许的情况下使用的总计的方法.我只是使用与update相同的功能.

merge is an optimization that only happens in certain circumstances, if I remember correctly -- a way to sub-total that the SQL optimization may use if the circumstances warrant it. I just use the same function I use for update.

A case class将自动转换为适当的模式,因此对于额外的问题,答案是肯定的,这是因为这些模式匹配.将dataType更改为不匹配,将会出现错误.

A case class will automatically get converted to the appropriate schema, so for the bonus question the answer is yes, it's because the schemas match. Change the dataType to not match, you will get an error.

这篇关于Spark中具有多个输入列的UDAF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆