pyspark.ml 管道:基本预处理任务是否需要自定义转换器? [英] pyspark.ml pipelines: are custom transformers necessary for basic preprocessing tasks?

查看:25
本文介绍了pyspark.ml 管道:基本预处理任务是否需要自定义转换器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

开始使用 pyspark.ml 和管道 API,我发现自己为典型的预处理任务编写了自定义转换器,以便在管道中使用它们.示例:

Getting started with pyspark.ml and the pipelines API, I find myself writing custom transformers for typical preprocessing tasks in order to use them in a pipeline. Examples:

from pyspark.ml import Pipeline, Transformer


class CustomTransformer(Transformer):
    # lazy workaround - a transformer needs to have these attributes
    _defaultParamMap = dict()
    _paramMap = dict()
    _params = dict()

class ColumnSelector(CustomTransformer):
    """Transformer that selects a subset of columns
    - to be used as pipeline stage"""

    def __init__(self, columns):
        self.columns = columns


    def _transform(self, data):
        return data.select(self.columns)


class ColumnRenamer(CustomTransformer):
    """Transformer renames one column"""


    def __init__(self, rename):
        self.rename = rename

    def _transform(self, data):
        (colNameBefore, colNameAfter) = self.rename
        return data.withColumnRenamed(colNameBefore, colNameAfter)


class NaDropper(CustomTransformer):
    """
    Drops rows with at least one not-a-number element
    """

    def __init__(self, cols=None):
        self.cols = cols


    def _transform(self, data):
        dataAfterDrop = data.dropna(subset=self.cols) 
        return dataAfterDrop


class ColumnCaster(CustomTransformer):

    def __init__(self, col, toType):
        self.col = col
        self.toType = toType

    def _transform(self, data):
        return data.withColumn(self.col, data[self.col].cast(self.toType))

它们可以工作,但我想知道这是一种模式还是反模式——这样的转换器是与管道 API 一起工作的好方法吗?是否有必要实现它们,或者其他地方是否提供了等效的功能?

They work, but I was wondering if this is a pattern or antipattern - are such transformers a good way to work with the pipeline API? Was it necessary to implement them, or is equivalent functionality provided somewhere else?

推荐答案

我想说它主要是基于意见的,尽管它看起来不必要地冗长而且 Python Transformers 不能很好地与其他方案集成Pipeline API.

I'd say it is primarily opinion based, although it looks unnecessarily verbose and Python Transformers don't integrate well with the rest of the Pipeline API.

还值得指出的是,您在这里拥有的一切都可以通过 SQLTransformer 轻松实现.例如:

It is also worth pointing out that everything you have here can be easily achieved with SQLTransformer. For example:

from pyspark.ml.feature import SQLTransformer

def column_selector(columns):
    return SQLTransformer(
        statement="SELECT {} FROM __THIS__".format(", ".join(columns))
    )

def na_dropper(columns):
    return SQLTransformer(
        statement="SELECT * FROM __THIS__ WHERE {}".format(
            " AND ".join(["{} IS NOT NULL".format(x) for x in columns])
        )
    )

稍加努力,您就可以使用带有 Hive 方言的 SQLAlchemy 来避免手写 SQL.

With a little bit of effort you can use SQLAlchemy with Hive dialect to avoid handwritten SQL.

这篇关于pyspark.ml 管道:基本预处理任务是否需要自定义转换器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆