Spark:OneHot编码器和存储管道(功能尺寸问题) [英] Spark: OneHot encoder and storing Pipeline (feature dimension issue)

查看:87
本文介绍了Spark:OneHot编码器和存储管道(功能尺寸问题)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个由多个特征转换阶段组成的管道(2.0.1).

We have a pipeline (2.0.1) consisting of multiple feature transformation stages.

其中一些阶段是OneHot编码器.想法:将基于整数的类别分为n个独立的特征.

Some of these stages are OneHot encoders. Idea: classify an integer-based category into n independent features.

在训练管道模型并使用其预测所有模型时,效果很好.但是,存储经过训练的管道模型并重新加载它会导致问题:

When training the pipeline model and using it to predict all works fine. However, storing the trained pipeline model and reloading it causes issues:

存储的训练有素" OneHot编码器无法跟踪有多少个类别.现在加载它会引起问题:当使用加载的模型进行预测时,它将重新确定有多少类别,从而导致训练特征空间和预测特征空间的大小(尺寸)不同.参见下面在Zeppelin笔记本中运行的示例代码:

The stored 'trained' OneHot encoder does not keep track of how many categories there are. Loading it now causes issues: When the loaded model is used to predict, it redetermines how many categories there are, causing the training feature space and prediction feature space to be of a different size (dimension). See the example code below as run in a Zeppelin notebook:

import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.PipelineModel


// Specifying two test samples, one with class 5 and one with class 3. This is OneHot encoded into 5 boolean features (sparse vector)
// Adding a 'filler' column because createDataFrame doesnt like single-column sequences and this is the easiest way to demo it ;)
val df = spark.createDataFrame(Seq((5, 1), (3, 1))).toDF("class", "filler")

val enc = new OneHotEncoder()
  .setInputCol("class")
  .setOutputCol("class_one_hot")

val pipeline = new Pipeline()
  .setStages(Array(enc))

val model = pipeline.fit(df)
model.transform(df).show()

/*
+-----+------+-------------+
|class|filler|class_one_hot|
+-----+------+-------------+
|    5|     1|(5,[],[])    |
|    3|     1|(5,[3],[1.0])|
+-----+------+-------------+

Note: Vector of size 5
*/

model.write.overwrite().save("s3a://one-hot")

val loadedModel = PipelineModel.load("s3a://one-hot")

val df2 = spark.createDataFrame(Seq((3, 1))).toDF("class", "output") // When using the trained model our input consists of one row (prediction engine style). The provided category for the prediction feature set is category 3
loadedModel.transform(df2).show()

/*
+-----+------+-------------+
|class|output|class_one_hot|
+-----+------+-------------+
|    3|     1|(3,[],[])    |
+-----+------+-------------+

Note: Incompatible vector of size 3
*/

我宁愿不要自己制作支持这种序列化的OneHot编码器,我可以直接使用其他替代方法吗?

I'd prefer to not make my own OneHot encoder that DOES support this serialization, are there any alternatives that I can use out of the box?

推荐答案

火花> = 2.3

Spark 2.3引入了 OneHotEncoderEstimator (在Spark 3.0中重命名为OneHotEncoder),可以直接使用,并支持多个输入列.

Spark 2.3 introduces OneHotEncoderEstimator (to be renamed as OneHotEncoder in Spark 3.0) which can be used directly, and supports multiple input columns.

火花< 2.3

您不使用OneHotEncoder,因为它打算被使用. OneHotEncoderTransofrmer而不是Estimator.它不存储有关级别的任何信息,而是取决于Column元数据来确定输出尺寸.如果缺少元数据(例如您的情况),它将使用后备策略并假定存在max(input_column)级别.序列化在这里无关紧要.

You don't use OneHotEncoder as it is intended to be used. OneHotEncoder is a Transofrmer not an Estimator. It doesn't store any information about the levels but depends on the Column metadata to determine output dimensions. If metadata is missing, like in your case, it uses fallback strategy and assumes there is max(input_column) levels. Serialization is irrelevant here.

典型用法涉及上游Pipeline中的Transformers,后者为您设置了元数据.一个常见的例子是StringIndexer.

Typical usage involves Transformers in the upstream Pipeline, which set metadata for you. One common example is StringIndexer.

仍然可以手动设置元数据,但涉及更多:

It is still possible to set metadata manually, but it is more involved:

import org.apache.spark.ml.attribute.NominalAttribute

val meta = NominalAttribute.defaultAttr
  .withName("class")
  .withValues("0", (1 to 5).map(_.toString): _*)
  .toMetadata

loadedModel.transform(df2.select($"class".as("class", meta), $"output"))

类似地在Python中(需要Spark> = 2.2):

Similarly in Python (needs Spark >= 2.2):

from pyspark.sql.functions import col

meta = {"ml_attr": {
    "vals": [str(x) for x in range(6)],   # Provide a set of levels
    "type": "nominal", 
    "name": "class"}}

loaded.transform(
    df.withColumn("class", col("class").alias("class", metadata=meta))
)

元数据还可以使用多种不同的方法进行附加:如何在pyspark中更改列元数据?.

Metadata can be also attached using a number of different methods: How to change column metadata in pyspark?.

这篇关于Spark:OneHot编码器和存储管道(功能尺寸问题)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆