Spark ML将自定义OneHotEncoder插入/拟合到管道中 [英] Spark ML insert/fit custom OneHotEncoder into a Pipeline

查看:114
本文介绍了Spark ML将自定义OneHotEncoder插入/拟合到管道中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

说我在一个数据帧中有一些功能/列,我在上面应用了常规的OneHotEncoder,在第一个(第n个列)列中,我需要应用我的自定义OneHotEncoder.然后,我需要使用VectorAssembler来组装这些功能,并放入管道中,最后拟合我的trainData并从我的testData中获得预测,例如:

Say I have a few features/columns in a dataframe on which I apply the regular OneHotEncoder, and one (let, n-th) column on which I need to apply my custom OneHotEncoder. Then I need to use VectorAssembler to assemble those features, and put into a Pipeline, finally fitting my trainData and getting predictions from my testData, such as:

val sIndexer1 = new StringIndexer().setInputCol("my_feature1").setOutputCol("indexed_feature1")
// ... let, n-1 such sIndexers for n-1 features
val featureEncoder = new OneHotEncoderEstimator().setInputCols(Array(sIndexer1.getOutputCol), ...).
      setOutputCols(Array("encoded_feature1", ... ))

// **need to insert output from my custom OneHotEncoder function (please see below)**
// (which takes the n-th feature as input) in a way that matches the VectorAssembler below

val vectorAssembler = new VectorAssembler().setInputCols(featureEncoder.getOutputCols + ???).
      setOutputCol("assembled_features")

...

val pipeline = new Pipeline().setStages(Array(sIndexer1, ...,featureEncoder, vectorAssembler, myClassifier))
val model = pipeline.fit(trainData)
val predictions = model.transform(testData)

如何修改vectorAssembler的构造,以便它可以接收来自自定义OneHotEncoder的输出? 问题是我想要的oheEncodingTopN()不能/不应该引用实际"数据帧,因为它是管道的一部分(适用于trainData/testData).

How can I modify the building of the vectorAssembler so that it can ingest the output from the custom OneHotEncoder? The problem is my desired oheEncodingTopN() cannot/should not refer to the "actual" dataframe, since it would be a part of the pipeline (to apply on trainData/testData).

注意:

我测试了自定义的OneHotEncoder(请参见

I tested that the custom OneHotEncoder (see link) works just as expected separately on e.g. trainData. Basically, oheEncodingTopN applies OneHotEncoding on the input column, but for the top N frequent values only (e.g. N = 50), and put all the rest infrequent values in a dummy column (say, "default"), e.g.:

val oheEncoded = oheEncodingTopN(df, "my_featureN", 50)

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.Column


def flip(col: Column): Column = when(col === 1, lit(0)).otherwise(lit(1))

def oheEncodingTopN(df: DataFrame, colName: String, n: Int): DataFrame = {
  df.createOrReplaceTempView("data")
  val topNDF = spark.sql(s"select $colName, count(*) as count from data group by $colName order by count desc limit $n")

  val pivotTopNDF = topNDF.
    groupBy(colName).
    pivot(colName).
    count().
    withColumn("default", lit(1))

  val joinedTopNDF = df.join(pivotTopNDF, Seq(colName), "left").drop(colName)

  val oheEncodedDF = joinedTopNDF.
    na.fill(0, joinedTopNDF.columns).
    withColumn("default", flip(col("default")))

   oheEncodedDF

}

推荐答案

我认为最干净的方法是创建自己的类来扩展spark ML Transformer,以便您可以像使用其他任何变压器一样进行操作(例如OneHotEncoder).您的课程如下所示:

I think the cleanest way would be to create your own class that extends spark ML Transformer so that you can play with as you would do with any other transformer (like OneHotEncoder). Your class would look like this :

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.Param
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Column}

class OHEncodingTopN(n :Int, override val uid: String) extends Transformer {
  final val inputCol= new Param[String](this, "inputCol", "The input column")
  final val outputCol = new Param[String](this, "outputCol", "The output column")

 ; def setInputCol(value: String): this.type = set(inputCol, value)

  def setOutputCol(value: String): this.type = set(outputCol, value)

  def this(n :Int) = this(n, Identifiable.randomUID("OHEncodingTopN"))

  def copy(extra: ParamMap): OHEncodingTopN = {
    defaultCopy(extra)
  }

  override def transformSchema(schema: StructType): StructType = {
    // Check that the input type is what you want if needed 
    //     val idx = schema.fieldIndex($(inputCol))
    //     val field = schema.fields(idx)
    //     if (field.dataType != StringType) {
    //       throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    //     }
    // Add the return field
    schema.add(StructField($(outputCol), IntegerType, false))
  }
  def flip(col: Column): Column = when(col === 1, lit(0)).otherwise(lit(1))

  def transform(df: Dataset[_]): DataFrame = {
      df.createOrReplaceTempView("data")
      val colName = $(inputCol)
      val topNDF = df.sparkSession.sql(s"select $colName, count(*) as count from data group by $colName order by count desc limit $n")

      val pivotTopNDF = topNDF.
        groupBy(colName).
        pivot(colName).
        count().
        withColumn("default", lit(1))

      val joinedTopNDF = df.join(pivotTopNDF, Seq(colName), "left").drop(colName)

      val oheEncodedDF = joinedTopNDF.
        na.fill(0, joinedTopNDF.columns).
        withColumn("default", flip(col("default")))

       oheEncodedDF
  }
}

现在在OHEncodingTopN对象上,您应该可以调用.getOuputCol来执行所需的操作.祝你好运.

Now on a OHEncodingTopN object you should be able to call .getOuputCol to perform what you want. Good luck.

编辑:应该略微修改您刚刚复制粘贴到transform方法中的方法,以便输出具有setOutputCol中给定名称的Vector类型的列.

your method that I just copy pasted in the transform method should be slightly modified in order to output a column of type Vector having the name given in the setOutputCol.

这篇关于Spark ML将自定义OneHotEncoder插入/拟合到管道中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆