Spark,DataFrame:对组应用变换器/估计器 [英] Spark, DataFrame: apply transformer/estimator on groups
问题描述
我有一个如下所示的DataFrame:
I have a DataFrame that looks like follow:
+-----------+-----+------------+
| userID|group| features|
+-----------+-----+------------+
|12462563356| 1| [5.0,43.0]|
|12462563701| 2| [1.0,8.0]|
|12462563701| 1| [2.0,12.0]|
|12462564356| 1| [1.0,1.0]|
|12462565487| 3| [2.0,3.0]|
|12462565698| 2| [1.0,1.0]|
|12462565698| 1| [1.0,1.0]|
|12462566081| 2| [1.0,2.0]|
|12462566081| 1| [1.0,15.0]|
|12462566225| 2| [1.0,1.0]|
|12462566225| 1| [9.0,85.0]|
|12462566526| 2| [1.0,1.0]|
|12462566526| 1| [3.0,79.0]|
|12462567006| 2| [11.0,15.0]|
|12462567006| 1| [10.0,15.0]|
|12462567006| 3| [10.0,15.0]|
|12462586595| 2| [2.0,42.0]|
|12462586595| 3| [2.0,16.0]|
|12462589343| 3| [1.0,1.0]|
+-----------+-----+------------+
列类型为:userID:长整数,组:整数,特征:矢量.
Where the columns types are: userID: Long, group: Int, and features:vector.
这已经是一个分组的DataFrame,即,一个用户ID最多会出现在特定组中一次.
This is already a grouped DataFrame, i.e. a userID will appear in a particular group at max one time.
我的目标是按组扩展features
列.
My goal is to scale the features
column per group.
是否有一种方法可以应用功能转换器(在我的情况下,我想应用 StandardScaler ) per组,而不是将其应用于完整的DataFrame.
Is there a way to apply a feature transformer (in my case I would like to apply a StandardScaler) per group instead of applying it to the full DataFrame.
P.S.使用ML不是强制性的,因此如果解决方案基于MLlib,则没有问题.
P.S. using ML is not mandatory, so no problem if the solution is based on MLlib.
推荐答案
计算统计信息
火花> = 3.0
现在Summarizer
支持标准差,所以
Compute statistics
Spark >= 3.0
Now Summarizer
supports standard deviations so
val summary = data
.groupBy($"group")
.agg(Summarizer.metrics("mean", "std")
.summary($"features").alias("stats"))
.as[(Int, (Vector, Vector))]
.collect.toMap
火花> = 2.3
在Spark 2.3或更高版本中,您还可以使用Summarizer
:
import org.apache.spark.ml.stat.Summarizer
val summaryVar = data
.groupBy($"group")
.agg(Summarizer.metrics("mean", "variance")
.summary($"features").alias("stats"))
.as[(Int, (Vector, Vector))]
.collect.toMap
并调整下游代码以处理差异而不是标准偏差.
and adjust downstream code to handle variances instead of standard deviations.
您可以使用与默认Scaler
几乎相同的代码按组计算统计信息:
You can compute statistics by group using almost the same code as default Scaler
:
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row
// Compute Multivariate Statistics
val summary = data.select($"group", $"features")
.rdd
.map {
case Row(group: Int, features: Vector) => (group, features)
}
.aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */
(agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */
(agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */
.mapValues {
s => (
s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */
s.mean.toArray /* fetch the mean for each key */
)
}.collectAsMap
转化
如果期望的组数相对较低,则可以广播这些:
Transformation
If expected number of groups is relatively low you can broadcast these:
val summaryBd = sc.broadcast(summary)
并转换您的数据:
val scaledRows = df.rdd.map{ case Row(userID, group: Int, features: Vector) =>
val (stdev, mean) = summaryBd.value(group)
val vs = features.toArray.clone()
for (i <- 0 until vs.size) {
vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1 / stdev(i))
}
Row(userID, group, Vectors.dense(vs))
}
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema)
否则,您可以简单地加入.将其包装为以组列为参数的ML转换器应该不难.
Otherwise you can simply join. It shouldn't be hard to wrap this as a ML transformer with group column as a param.
这篇关于Spark,DataFrame:对组应用变换器/估计器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!