在 PySpark 中编码和组装多个功能 [英] Encode and assemble multiple features in PySpark

查看:24
本文介绍了在 PySpark 中编码和组装多个功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Python 类,用于在 Spark 中加载和处理一些数据.在我需要做的各种事情中,我正在生成一个从 Spark 数据帧中的各个列派生的虚拟变量列表.我的问题是我不确定如何正确定义用户定义的函数来完成我需要的功能.

I have a Python class that I'm using to load and process some data in Spark. Among various things I need to do, I'm generating a list of dummy variables derived from various columns in a Spark dataframe. My problem is that I'm not sure how to properly define a User Defined Function to accomplish what I need.

确实目前有一个方法,当映射到底层数据帧 RDD 时,解决了一半的问题(请记住,这是一个更大的 data_processor 类中的方法):

I do currently have a method that, when mapped over the underlying dataframe RDD, solves half the problem (remember that this is a method in a larger data_processor class):

def build_feature_arr(self,table):
    # this dict has keys for all the columns for which I need dummy coding
    categories = {'gender':['1','2'], ..}

    # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
    if table == 'users':
        iter_over = self.config.dyadic_features_to_include
    elif table == 'activty':
        iter_over = self.config.user_features_to_include

    def _build_feature_arr(row):
        result = []
        row = row.asDict()
        for col in iter_over:
            column_value = str(row[col]).lower()
            cats = categories[col]
            result += [1 if column_value and cat==column_value else 0 for cat in cats]
        return result
    return _build_feature_arr

本质上,对于指定的数据框,它的作用是获取指定列的分类变量值,并返回这些新虚拟变量的值列表.这意味着以下代码:

Essentially what this does is, for the specified dataframe, takes the categorical variable values for the specified columns, and returns a list of the values of these new dummy variables. That means the following code:

data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))

返回类似:

In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 0],
 [1, 0, 1, 0, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [0, 1, 1, 0, 0, 0],
 [1, 0, 1, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 1]]

在生成我想要的虚拟变量列表方面,这正是我想要的,但这是我的问题:我怎样才能 (a) 制作一个可以在 Spark SQL 查询中使用的具有类似功能的 UDF(或其他方式,我想),或者(b)采用上述地图产生的 RDD 并将其作为新列添加到 user_data 数据帧?

This is exactly what I want in terms of generating the list of dummy variables I want, but here's my question: How can I either (a) make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose), or (b) take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

无论哪种方式,我需要做的是生成一个包含 user_data 列的新数据框,以及一个包含上述函数输出的新列(我们称之为 feature_array)(或其他功能相当).

Either way, what I need to do is generate a new dataframe containing the columns from user_data, along with a new column (let's call it feature_array) containing the output of the function above (or something functionally equivalent).

推荐答案

Spark >= 2.3, >= 3.0

自从 Spark 2.3 OneHotEncoder 被弃用,取而代之的是 OneHotEncoderEstimator.如果你使用最近的版本请修改encoder代码

Since Spark 2.3 OneHotEncoder is deprecated in favor of OneHotEncoderEstimator. If you use a recent release please modify encoder code

from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

在 Spark 3.0 中,此变体已重命名为 OneHotEncoder:

In Spark 3.0 this variant has been renamed to OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

另外 StringIndexer 已扩展为支持多个输入列:

Additionally StringIndexer has been extended to support multiple input columns:

StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])

火花<2.3

好吧,您可以编写 UDF,但为什么要这样做?已经有很多工具可以处理此类任务:

Well, you can write an UDF but why would you? There are already quite a few tools designed to handle this category of tasks:

from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

row = Row("gender", "foo", "bar")

df = sc.parallelize([
  row("0", 3.0, DenseVector([0, 2.1, 1.0])),
  row("1", 1.0, DenseVector([0, 1.1, 1.0])),
  row("1", -1.0, DenseVector([0, 3.4, 0.0])),
  row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()

首先StringIndexer.

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()

## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## |     0| 3.0|           0.0|
## |     1| 1.0|           1.0|
## |     1|-1.0|           1.0|
## |     0|-3.0|           0.0|
## +------+----+--------------+

下一个OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()

## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## |     0| 3.0|           0.0|(1,[0],[1.0])|
## |     1| 1.0|           1.0|    (1,[],[])|
## |     1|-1.0|           1.0|    (1,[],[])|
## |     0|-3.0|           0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+

VectorAssembler:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["gender_vector", "bar", "foo"], outputCol="features")

encoded_df_with_indexed_bar = (vector_indexer
    .fit(encoded_df)
    .transform(encoded_df))

final_df = assembler.transform(encoded_df)

如果 bar 包含分类变量,您可以使用 VectorIndexer 来设置所需的元数据:

If bar contained categorical variables you could use VectorIndexer to set required metadata:

from pyspark.ml.feature import VectorIndexer

vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")

但这里不是这样.

最后,您可以使用管道包装所有这些:

Finally you can wrap all of that using pipelines:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)

可以说,它比从头开始编写所有内容都要健壮和干净.有一些警告,尤其是当您需要在不同数据集之间进行一致编码时.您可以在 StringIndexerVectorIndexer 的官方文档中阅读更多内容.

Arguably it is much robust and clean approach than writing everything from scratch. There are some caveats especially when you need consistent encoding between different datasets. You can read more in the official documentation for StringIndexer and VectorIndexer.

获得类似输出的另一种方法是 RFormula 其中:

Another way to get a comparable output is RFormula which:

RFormula 生成特征的向量列和标签的双列或字符串列.就像在 R 中使用公式进行线性回归一样,字符串输入列将被单热编码,数字列将被转换为双精度.如果标签列是字符串类型,它会首先用 StringIndexer 转换为 double.如果 DataFrame 中不存在标签列,则将根据公式中指定的响应变量创建输出标签列.

RFormula produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles. If the label column is of type string, it will be first transformed to double with StringIndexer. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.

from pyspark.ml.feature import RFormula

rf = RFormula(formula="~ gender +  bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)

如您所见,它更简洁,但更难编写,不允许进行太多自定义.然而,像这样的简单管道的结果将是相同的:

As you can see it is much more concise, but harder to compose doesn't allow much customization. Nevertheless the result for a simple pipeline like this one will be identical:

final_df_rf.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+


final_df.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+

关于您的问题:

制作一个具有类似功能的 UDF,我可以在 Spark SQL 查询中使用它(或其他方式,我想)

make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)

它只是一个像其他任何一样的 UDF.确保您使用受支持的类型,除此之外,一切都应该正常工作.

It is just an UDF like any other. Make sure you use supported types and beyond that everything should work just fine.

将上述地图生成的 RDD 作为新列添加到 user_data 数据帧中?

take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField

schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)

注意:

对于 Spark 1.x,将 pyspark.ml.linalg 替换为 pyspark.mllib.linalg.

For Spark 1.x replace pyspark.ml.linalg with pyspark.mllib.linalg.

这篇关于在 PySpark 中编码和组装多个功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆