Spark、DataFrame:在组上应用转换器/估计器 [英] Spark, DataFrame: apply transformer/estimator on groups

查看:21
本文介绍了Spark、DataFrame:在组上应用转换器/估计器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下所示的 DataFrame:

I have a DataFrame that looks like follow:

+-----------+-----+------------+
|     userID|group|    features|
+-----------+-----+------------+
|12462563356|    1|  [5.0,43.0]|
|12462563701|    2|   [1.0,8.0]|
|12462563701|    1|  [2.0,12.0]|
|12462564356|    1|   [1.0,1.0]|
|12462565487|    3|   [2.0,3.0]|
|12462565698|    2|   [1.0,1.0]|
|12462565698|    1|   [1.0,1.0]|
|12462566081|    2|   [1.0,2.0]|
|12462566081|    1|  [1.0,15.0]|
|12462566225|    2|   [1.0,1.0]|
|12462566225|    1|  [9.0,85.0]|
|12462566526|    2|   [1.0,1.0]|
|12462566526|    1|  [3.0,79.0]|
|12462567006|    2| [11.0,15.0]|
|12462567006|    1| [10.0,15.0]|
|12462567006|    3| [10.0,15.0]|
|12462586595|    2|  [2.0,42.0]|
|12462586595|    3|  [2.0,16.0]|
|12462589343|    3|   [1.0,1.0]|
+-----------+-----+------------+

其中列类型为:userID:Long、group:Int 和 features:vector.

Where the columns types are: userID: Long, group: Int, and features:vector.

这已经是一个分组的 DataFrame,即用户 ID 最多将出现在特定组中一次.

This is already a grouped DataFrame, i.e. a userID will appear in a particular group at max one time.

我的目标是按组缩放 features 列.

My goal is to scale the features column per group.

有没有办法应用功能转换器(在我的如果我想应用 StandardScaler) pergroup 而不是将其应用于完整的 DataFrame.

Is there a way to apply a feature transformer (in my case I would like to apply a StandardScaler) per group instead of applying it to the full DataFrame.

附言使用 ML 不是强制性的,因此如果解决方案基于 MLlib,则没问题.

P.S. using ML is not mandatory, so no problem if the solution is based on MLlib.

推荐答案

计算统计数据

火花 >= 3.0

现在 Summarizer 支持标准偏差所以

val summary = data
  .groupBy($"group")
  .agg(Summarizer.metrics("mean", "std")
  .summary($"features").alias("stats"))
  .as[(Int, (Vector, Vector))]
  .collect.toMap

火花 >= 2.3

在 Spark 2.3 或更高版本中,您还可以使用 Summarizer:

import org.apache.spark.ml.stat.Summarizer

val summaryVar = data
  .groupBy($"group")
  .agg(Summarizer.metrics("mean", "variance")
  .summary($"features").alias("stats"))
  .as[(Int, (Vector, Vector))]
  .collect.toMap

并调整下游代码以处理差异而不是标准偏差.

and adjust downstream code to handle variances instead of standard deviations.

您可以使用与默认Scaler几乎相同的代码按组计算统计数据:

You can compute statistics by group using almost the same code as default Scaler:

import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row

// Compute Multivariate Statistics 
val summary = data.select($"group", $"features")
    .rdd
    .map {
         case Row(group: Int, features: Vector) => (group, features) 
    }
    .aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */
         (agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */
         (agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */
    .mapValues {
      s => (
         s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */
         s.mean.toArray /* fetch the mean for each key */
      )
    }.collectAsMap

转型

如果预期的组数相对较少,您可以广播这些:

Transformation

If expected number of groups is relatively low you can broadcast these:

val summaryBd = sc.broadcast(summary)

并转换您的数据:

val scaledRows = df.rdd.map{ case Row(userID, group: Int, features: Vector) =>
  val (stdev, mean)  =  summaryBd.value(group)
  val vs = features.toArray.clone()
  for (i <- 0 until vs.size) {
    vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1 / stdev(i))
  }
  Row(userID, group, Vectors.dense(vs))
}
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema)

否则你可以简单地加入.将其包装为以组列作为参数的 ML 转换器应该不难.

Otherwise you can simply join. It shouldn't be hard to wrap this as a ML transformer with group column as a param.

这篇关于Spark、DataFrame:在组上应用转换器/估计器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆