Spark、DataFrame:在组上应用转换器/估计器 [英] Spark, DataFrame: apply transformer/estimator on groups
问题描述
我有一个如下所示的 DataFrame:
I have a DataFrame that looks like follow:
+-----------+-----+------------+
| userID|group| features|
+-----------+-----+------------+
|12462563356| 1| [5.0,43.0]|
|12462563701| 2| [1.0,8.0]|
|12462563701| 1| [2.0,12.0]|
|12462564356| 1| [1.0,1.0]|
|12462565487| 3| [2.0,3.0]|
|12462565698| 2| [1.0,1.0]|
|12462565698| 1| [1.0,1.0]|
|12462566081| 2| [1.0,2.0]|
|12462566081| 1| [1.0,15.0]|
|12462566225| 2| [1.0,1.0]|
|12462566225| 1| [9.0,85.0]|
|12462566526| 2| [1.0,1.0]|
|12462566526| 1| [3.0,79.0]|
|12462567006| 2| [11.0,15.0]|
|12462567006| 1| [10.0,15.0]|
|12462567006| 3| [10.0,15.0]|
|12462586595| 2| [2.0,42.0]|
|12462586595| 3| [2.0,16.0]|
|12462589343| 3| [1.0,1.0]|
+-----------+-----+------------+
其中列类型为:userID:Long、group:Int 和 features:vector.
Where the columns types are: userID: Long, group: Int, and features:vector.
这已经是一个分组的 DataFrame,即用户 ID 最多将出现在特定组中一次.
This is already a grouped DataFrame, i.e. a userID will appear in a particular group at max one time.
我的目标是按组缩放 features
列.
My goal is to scale the features
column per group.
有没有办法应用功能转换器(在我的如果我想应用 StandardScaler) pergroup 而不是将其应用于完整的 DataFrame.
Is there a way to apply a feature transformer (in my case I would like to apply a StandardScaler) per group instead of applying it to the full DataFrame.
附言使用 ML 不是强制性的,因此如果解决方案基于 MLlib,则没问题.
P.S. using ML is not mandatory, so no problem if the solution is based on MLlib.
推荐答案
计算统计数据
火花 >= 3.0
现在 Summarizer
支持标准偏差所以
val summary = data
.groupBy($"group")
.agg(Summarizer.metrics("mean", "std")
.summary($"features").alias("stats"))
.as[(Int, (Vector, Vector))]
.collect.toMap
火花 >= 2.3
在 Spark 2.3 或更高版本中,您还可以使用 Summarizer
:
import org.apache.spark.ml.stat.Summarizer
val summaryVar = data
.groupBy($"group")
.agg(Summarizer.metrics("mean", "variance")
.summary($"features").alias("stats"))
.as[(Int, (Vector, Vector))]
.collect.toMap
并调整下游代码以处理差异而不是标准偏差.
and adjust downstream code to handle variances instead of standard deviations.
您可以使用与默认Scaler
几乎相同的代码按组计算统计数据:
You can compute statistics by group using almost the same code as default Scaler
:
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row
// Compute Multivariate Statistics
val summary = data.select($"group", $"features")
.rdd
.map {
case Row(group: Int, features: Vector) => (group, features)
}
.aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */
(agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */
(agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */
.mapValues {
s => (
s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */
s.mean.toArray /* fetch the mean for each key */
)
}.collectAsMap
转型
如果预期的组数相对较少,您可以广播这些:
Transformation
If expected number of groups is relatively low you can broadcast these:
val summaryBd = sc.broadcast(summary)
并转换您的数据:
val scaledRows = df.rdd.map{ case Row(userID, group: Int, features: Vector) =>
val (stdev, mean) = summaryBd.value(group)
val vs = features.toArray.clone()
for (i <- 0 until vs.size) {
vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1 / stdev(i))
}
Row(userID, group, Vectors.dense(vs))
}
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema)
否则你可以简单地加入.将其包装为以组列作为参数的 ML 转换器应该不难.
Otherwise you can simply join. It shouldn't be hard to wrap this as a ML transformer with group column as a param.
这篇关于Spark、DataFrame:在组上应用转换器/估计器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!