恩code和装配在PySpark多种功能 [英] Encode and assemble multiple features in PySpark

查看:269
本文介绍了恩code和装配在PySpark多种功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有我使用加载和处理星火一些数据的Python类。在我需要做的各种事情,我从生成各列在Spark数据帧导出虚拟变量列表。我的问题是我不知道如何正确定义一个用户定义的函数来完成我所需要的。

我的的目前有,当映射了潜在的数据帧RDD,解决了一半的问题(请记住,这是一个方法的方法更大的 data_processor ​​类):

 高清build_feature_arr(个体经营,表):
    #这dict有对,我需要假编码的所有列项
    类别= {性别:['1','2'],..}    #其实有,我需要为做这两dataframes不同的充,这只是指定哪些我看,并从配置文件抓住了相关的功能
    如果表=='用户':
        iter_over = self.config.dyadic_features_to_include
    ELIF表=='activty:
        iter_over = self.config.user_features_to_include    高清_build_feature_arr(行):
        结果= []
        行= row.asDict()
        在iter_over西:
            COLUMN_VALUE = STR(行[COL])。低()
            猫=类别[COL]
            结果+ = 1,如果COLUMN_VALUE和猫==猫在猫别的COLUMN_VALUE 0]
        返回结果
    返回_build_feature_arr

实质上这是什么一样是,对于指定的数据框,作为分类变量的值在指定的列,并返回这些新的虚拟概率变量的值的列表。这意味着以下code:

 数据= data_processor(init_args)
结果= data.user_data.rdd.map(self.build_feature_arr('用户'))

返回是这样的:

 在[39]:result.take(10)
出[39]:
[[1,0,0,0,1,0],
 [1,0,0,1,0,0],
 [1,0,0,0,0,0],
 [1,0,1,0,0,0],
 [1,0,0,1,0,0],
 [1,0,0,1,0,0],
 [0,1,1,0,0,0],
 [1,0,1,1,0,0],
 [1,0,0,1,0,0],
 [1,0,0,0,0,1]]

这正是我想要生成的虚拟变量我想名单的条件,但这里是我的问题:我怎么能是(a)让具有类似功能的UDF我可以在一个星火SQL查询中使用(或一些其他的方式,我想),或(b)采取RDD从上述图产生并将其添加为一个新列中的用户数据数据帧?

无论哪种方式,我需要做的是生成包含从用户 - 数据的列,新列沿着一个新的数据框(我们称之为 feature_array )包含输出上面的函数(或一些功能等同的)。


解决方案

好了,你可以写一个UDF,但为什么你会吗?现在已经有专门用来处理这类任务相当多的工具:

 从pyspark.sql进口排
从pyspark.mllib.linalg进口DenseVector行=行(性别,富,酒吧)DF = sc.parallelize([
  行(0,3.0,DenseVector([0,2.1,1.0])),
  行(1,1.0,DenseVector([0,1.1,1.0])),
  行(1,-1.0,DenseVector([0,3.4,0.0])),
  行(0,-3.0,DenseVector([0,4.1,0.0]))
])。toDF()

首先 StringIndexer

 从pyspark.ml.feature进口StringIndexer索引= StringIndexer(inputCol =性别,outputCol =gender_numeric)。拟合(DF)
indexed_df = indexer.transform(DF)
indexed_df.drop(巴)。展()## + ------ + ---- + -------------- +
## |性别|富| gender_numeric |
## + ------ + ---- + -------------- +
## | 0 | 3.0 | 0.0 |
## | 1 | 1.0 | 1.0 |
## | 1 | -1.0 | 1.0 |
## | 0 | -3.0 | 0.0 |
## + ------ + ---- + -------------- +

下一页 OneHotEn codeR

 从pyspark.ml.feature进口OneHotEn codeREN codeR = OneHotEn codeR(inputCol =gender_numeric,outputCol =gender_vector)
EN coded_df = EN coder.transform(indexed_df)
EN coded_df.drop(巴)。展()## + ------ + ---- + -------------- + ------------- +
## |性别|富| gender_numeric | gender_vector |
## + ------ + ---- + -------------- + ------------- +
## | 0 | 3.0 | 0.0 |(1,[0],[1.0])|
## | 1 | 1.0 | 1.0 | (1,[],[])|
## | 1 | -1.0 | 1.0 | (1,[],[])|
## | 0 | -3.0 | 0.0 |(1,[0],[1.0])|
## + ------ + ---- + -------------- + ------------- +

VectorIndexer VectorAssembler

 从pyspark.ml.feature进口VectorIndexer,VectorAssemblervector_indexer = VectorIndexer(inputCol =酒吧,outputCol =bar_indexed)
汇编= VectorAssembler(
    inputCols = [gender_vector,bar_indexed,富],outputCol =特色)EN coded_df_with_indexed_bar =(vector_indexer
    .fit(EN coded_df)
    .transform(EN coded_df))final_df = assembler.transform(EN coded_df_with_indexed_bar)

最后你可以用所有使用管道:

 从pyspark.ml进口管道
管道=管线(阶段= [索引器,连接codeR,vector_indexer,汇编])
模型= pipeline.fit(DF)
转化= model.transform(DF)

可以说,它是非常强大的,干净的做法比写一切从头开始。还有一些注意事项特别是当你需要不同的数据集之间的一致的编码。你可以阅读更多的官方文档中的 StringIndexer VectorIndexer

关于你的问题:


  

请具有类似功能的UDF我可以在一个星火SQL查询中使用(或其他方式,我想)


这就像任何其他的UDF。请确保您使用支持的类型及以后,一切都应该工作得很好。


  

从采取上述的地图造成的RDD并将其添加为一个新列中的用户数据数据帧?


 从pyspark.mllib.linalg进口VectorUDT
从pyspark.sql.types导入StructType,StructField模式= StructType([StructField(特征,VectorUDT(),TRUE)])
行=行(特征)
result.map(波长X:排(DenseVector(X)))。toDF(架构)

I have a Python class that I'm using to load and process some data in Spark. Among various things I need to do, I'm generating a list of dummy variables derived from various columns in a Spark dataframe. My problem is that I'm not sure how to properly define a User Defined Function to accomplish what I need.

I do currently have a method that, when mapped over the underlying dataframe RDD, solves half the problem (remember that this is a method in a larger data_processor class):

def build_feature_arr(self,table):
    # this dict has keys for all the columns for which I need dummy coding
    categories = {'gender':['1','2'], ..}

    # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
    if table == 'users':
        iter_over = self.config.dyadic_features_to_include
    elif table == 'activty':
        iter_over = self.config.user_features_to_include

    def _build_feature_arr(row):
        result = []
        row = row.asDict()
        for col in iter_over:
            column_value = str(row[col]).lower()
            cats = categories[col]
            result += [1 if column_value and cat==column_value else 0 for cat in cats]
        return result
    return _build_feature_arr

Essentially what this does is, for the specified dataframe, takes the categorical variable values for the specified columns, and returns a list of the values of these new dummy variables. That means the following code:

data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))

returns something like:

In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 0],
 [1, 0, 1, 0, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [0, 1, 1, 0, 0, 0],
 [1, 0, 1, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 1]]

This is exactly what I want in terms of generating the list of dummy variables I want, but here's my question: How can I either (a) make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose), or (b) take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

Either way, what I need to do is generate a new dataframe containing the columns from user_data, along with a new column (let's call it feature_array) containing the output of the function above (or something functionally equivalent).

解决方案

Well, you can write an UDF but why would you? There are already quite a few tools designed to handle this category of tasks:

from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector

row = Row("gender", "foo", "bar")

df = sc.parallelize([
  row("0", 3.0, DenseVector([0, 2.1, 1.0])),
  row("1", 1.0, DenseVector([0, 1.1, 1.0])),
  row("1", -1.0, DenseVector([0, 3.4, 0.0])),
  row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()

First of all StringIndexer.

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()

## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## |     0| 3.0|           0.0|
## |     1| 1.0|           1.0|
## |     1|-1.0|           1.0|
## |     0|-3.0|           0.0|
## +------+----+--------------+

Next OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()

## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## |     0| 3.0|           0.0|(1,[0],[1.0])|
## |     1| 1.0|           1.0|    (1,[],[])|
## |     1|-1.0|           1.0|    (1,[],[])|
## |     0|-3.0|           0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+

VectorIndexer and VectorAssembler:

from pyspark.ml.feature import VectorIndexer, VectorAssembler

vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")
assembler = VectorAssembler(
    inputCols=["gender_vector", "bar_indexed", "foo"], outputCol="features")

encoded_df_with_indexed_bar = (vector_indexer
    .fit(encoded_df)
    .transform(encoded_df))

final_df = assembler.transform(encoded_df_with_indexed_bar)

Finally you can wrap all of that using pipelines:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)

Arguably it is much robust and clean approach than writing everything from scratch. There are some caveats especially when you need consistent encoding between different datasets. You can read more in the official documentation for StringIndexer and VectorIndexer.

Regarding your questions:

make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)

It is just an UDF like any other. Make sure you use supported types and beyond that everything should work just fine.

take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

from pyspark.mllib.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField

schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)

这篇关于恩code和装配在PySpark多种功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆