pyspark.ml管道:基本预处理任务是否需要自定义转换器? [英] pyspark.ml pipelines: are custom transformers necessary for basic preprocessing tasks?

查看:88
本文介绍了pyspark.ml管道:基本预处理任务是否需要自定义转换器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

pyspark.ml和管道API开始,我发现自己为典型的预处理任务编写了自定义转换器,以便在管道中使用它们.例子:

Getting started with pyspark.ml and the pipelines API, I find myself writing custom transformers for typical preprocessing tasks in order to use them in a pipeline. Examples:

from pyspark.ml import Pipeline, Transformer


class CustomTransformer(Transformer):
    # lazy workaround - a transformer needs to have these attributes
    _defaultParamMap = dict()
    _paramMap = dict()
    _params = dict()

class ColumnSelector(CustomTransformer):
    """Transformer that selects a subset of columns
    - to be used as pipeline stage"""

    def __init__(self, columns):
        self.columns = columns


    def _transform(self, data):
        return data.select(self.columns)


class ColumnRenamer(CustomTransformer):
    """Transformer renames one column"""


    def __init__(self, rename):
        self.rename = rename

    def _transform(self, data):
        (colNameBefore, colNameAfter) = self.rename
        return data.withColumnRenamed(colNameBefore, colNameAfter)


class NaDropper(CustomTransformer):
    """
    Drops rows with at least one not-a-number element
    """

    def __init__(self, cols=None):
        self.cols = cols


    def _transform(self, data):
        dataAfterDrop = data.dropna(subset=self.cols) 
        return dataAfterDrop


class ColumnCaster(CustomTransformer):

    def __init__(self, col, toType):
        self.col = col
        self.toType = toType

    def _transform(self, data):
        return data.withColumn(self.col, data[self.col].cast(self.toType))

它们可以工作,但是我想知道这是模式还是反模式-这样的转换器是使用管道API的好方法吗?是否有必要实现它们,还是在其他地方提供了等效功能?

They work, but I was wondering if this is a pattern or antipattern - are such transformers a good way to work with the pipeline API? Was it necessary to implement them, or is equivalent functionality provided somewhere else?

推荐答案

我会说它主要是基于意见的,尽管它看起来不必要冗长,并且Python TransformersPipeline的其余部分集成得不好API.

I'd say it is primarily opinion based, although it looks unnecessarily verbose and Python Transformers don't integrate well with the rest of the Pipeline API.

还值得指出的是,使用SQLTransformer可以轻松实现此处的所有功能.例如:

It is also worth pointing out that everything you have here can be easily achieved with SQLTransformer. For example:

from pyspark.ml.feature import SQLTransformer

def column_selector(columns):
    return SQLTransformer(
        statement="SELECT {} FROM __THIS__".format(", ".join(columns))
    )

def na_dropper(columns):
    return SQLTransformer(
        statement="SELECT * FROM __THIS__ WHERE {}".format(
            " AND ".join(["{} IS NOT NULL".format(x) for x in columns])
        )
    )

只需花费一点点精力,您就可以将SQLAlchemy与Hive方言结合使用,以避免手写SQL.

With a little bit of effort you can use SQLAlchemy with Hive dialect to avoid handwritten SQL.

这篇关于pyspark.ml管道:基本预处理任务是否需要自定义转换器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆