Spark中具有多个输入列的UDAF [英] UDAF in Spark with multiple input columns
问题描述
我正在尝试开发一个用户定义的聚合函数,该函数可以计算一行数字的线性回归.我已经成功地完成了UDAF,该UDAF可以计算均值的置信区间(包括大量的反复试验和SO!).
I am trying to develop a user defined aggregate function that computes a linear regression on a row of numbers. I have successfully done a UDAF that calculates confidence intervals of means (with a lot trial and error and SO!).
这实际上是为我运行的东西:
Here's what actually runs for me already:
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{StructType, StructField, DoubleType, LongType, DataType, ArrayType}
case class RegressionData(intercept: Double, slope: Double)
class Regression {
import org.apache.commons.math3.stat.regression.SimpleRegression
def roundAt(p: Int)(n: Double): Double = { val s = math pow (10, p); (math round n * s) / s }
def getRegression(data: List[Long]): RegressionData = {
val regression: SimpleRegression = new SimpleRegression()
data.view.zipWithIndex.foreach { d =>
regression.addData(d._2.toDouble, d._1.toDouble)
}
RegressionData(roundAt(3)(regression.getIntercept()), roundAt(3)(regression.getSlope()))
}
}
class UDAFRegression extends UserDefinedAggregateFunction {
import java.util.ArrayList
def deterministic = true
def inputSchema: StructType =
new StructType().add("units", LongType)
def bufferSchema: StructType =
new StructType().add("buff", ArrayType(LongType))
def dataType: DataType =
new StructType()
.add("intercept", DoubleType)
.add("slope", DoubleType)
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, new ArrayList[Long]())
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
val longList: ArrayList[Long] = new ArrayList[Long](buffer.getList(0))
longList.add(input.getLong(0));
buffer.update(0, longList);
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
val longList: ArrayList[Long] = new ArrayList[Long](buffer1.getList(0))
longList.addAll(buffer2.getList(0))
buffer1.update(0, longList)
}
def evaluate(buffer: Row) = {
import scala.collection.JavaConverters._
val list = buffer.getList(0).asScala.toList
val regression = new Regression
regression.getRegression(list)
}
}
但是,数据集未按顺序排列,这在这里显然非常重要.因此,我需要第二个参数regression($"longValue", $"created_day")
代替regression($"longValue")
. created_day
是sql.types.DateType
.
However the datasets do not come in order, which is obviously very important here. Hence instead of regression($"longValue")
I need to a second param regression($"longValue", $"created_day")
. created_day
is a sql.types.DateType
.
我对DataType,StructType和诸如此类的东西感到很困惑,由于网络上缺少示例,我在这里遇到了试用和订购尝试.
I am pretty confused by DataTypes, StructTypes and what-not and due to the lack of examples on the web, I got stuck w/ my trial and order attempts here.
在我的情况下,这些StructTypes是否开销很大? (可变的)Map
不会做吗? MapType
实际上是不可变的,并且作为缓冲区类型不是很毫无意义吗?
Are those StructTypes overhead in my case? Wouldn't a (mutable) Map
just do? Is MapType
actually immutable and isn't this rather pointless to be a buffer type?
这是否必须与我在update()
中通过input.getLong(0)
检索到的类型相匹配?
Does this have to match the type I retrieve in update()
via in my case input.getLong(0)
?
我已经看过buffer.update(0, 0.0)
(很明显,当它包含Doubles时),buffer(0) = new WhatEver()
,我想甚至是buffer = Nil
.这些有什么不同吗?
I have seen buffer.update(0, 0.0)
(when it contains Doubles, obviously), buffer(0) = new WhatEver()
and I think even buffer = Nil
. Does any of these make a difference?
以上示例似乎过于复杂.我期望能够做某事.就像buffer += input.getLong(0) -> input.getDate(1)
.
我可以期望以这种方式访问输入吗?
The example above seems over complicated. I was expecting to be able to do sth. like buffer += input.getLong(0) -> input.getDate(1)
.
Can I expect to access the input this way
我可以将功能块留空吗
def merge(…) = {}
?
Can I just leave the function block empty like
def merge(…) = {}
?
在evaluate()
中对该缓冲区进行排序的挑战是很多.我应该能够弄清楚,尽管我仍然对你们如何做到这一点的最优雅的方式感兴趣(只需一小部分时间).
The challenge to sort that buffer in evaluate()
is sth. I should be able to figure out, although I am still interested in the most elegant ways of how you guys do this (in a fraction of the time).
我返回一个案例类,而不是dataType
中定义的StructType
,这似乎不是问题.还是因为它恰好与我的案例类匹配而起作用?
I return a case class, not the StructType
as defined in dataType
which does not seem to be an issue. Or is it working since it happens to match my case class?
推荐答案
也许这可以清除一切.
UDAF APIs
在DataFrame
Columns
上工作.就像DataFrame
中的所有其他Columns
一样,您正在做的所有事情都必须进行序列化.如您所述,唯一支持MapType
是不可变的,因为这是您可以放入Column
的唯一内容.对于不可变的集合,您只需创建一个新集合,其中包含旧集合以及一个值:
The UDAF APIs
work on DataFrame
Columns
. Everything you are doing has to get serialized just like all the other Columns
in the DataFrame
. As you note, the only support MapType
is immutable, because this is the only thing you can put in a Column
. With immutable collections, you just create a new collection that contains the old collection plus a value:
var map = Map[Long,Long]()
map = map + (0L -> 1234L)
map = map + (1L -> 4567L)
是的,就像使用任何DataFrame
一样,您的类型必须匹配.当确实存在LongType
问题时,请执行buffer.getInt(0)
.
Yes, just like working with any DataFrame
, your types have to match. Do buffer.getInt(0)
when there's really a LongType
there is going to be a problem.
没有复位缓冲区的标准方法,因为除了对您的数据类型/用例有意义的方法以外,其他方法也没有.也许零实际上是上个月的平衡.零可能是另一个数据集的运行平均值;可能零是null
或空字符串,或者零实际上是零.
There's no standard way to reset the buffer because other than whatever makes sense for your data type / use case. Maybe zero is actually last month's balanace; maybe zero is a running average from another dataset; maybe zero is an null
or an empty string or maybe zero is really zero.
merge
是仅在某些情况下发生的优化-一种归类为SQL优化可以在情况允许的情况下使用的总计的方法.我只是使用与update
相同的功能.
merge
is an optimization that only happens in certain circumstances, if I remember correctly -- a way to sub-total that the SQL optimization may use if the circumstances warrant it. I just use the same function I use for update
.
A case class
将自动转换为适当的模式,因此对于额外的问题,答案是肯定的,这是因为这些模式匹配.将dataType
更改为不匹配,将会出现错误.
A case class
will automatically get converted to the appropriate schema, so for the bonus question the answer is yes, it's because the schemas match. Change the dataType
to not match, you will get an error.
这篇关于Spark中具有多个输入列的UDAF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!