在 PySpark ML 中创建自定义转换器 [英] Create a custom Transformer in PySpark ML

查看:43
本文介绍了在 PySpark ML 中创建自定义转换器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark SQL DataFrames 和 ML (PySpark) 的新手.如何创建自定义标记器,例如删除停用词并使用 ?我可以扩展默认的吗?

解决方案

我可以扩展默认的吗?

不是真的.默认 Tokenizerpyspark.ml.wrapper.JavaTransformer 的子类,并且与 pyspark.ml.feature 中的其他转换器和估计器相同,委托对其 Scala 对应物的实际处理.由于您想使用 Python,您应该直接扩展 pyspark.ml.pipeline.Transformer.

import nltkfrom pyspark import keyword_only ## <2.0 ->pyspark.ml.util.keyword_only从 pyspark.ml 导入变压器从 pyspark.ml.param.shared 导入 HasInputCol、HasOutputCol、Param、Params、TypeConverters# 在 PySpark 中可用 >= 2.3.0从 pyspark.ml.util 导入 DefaultParamsReadable, DefaultParamsWritable从 pyspark.sql.functions 导入 udf从 pyspark.sql.types 导入 ArrayType、StringType类 NLTKWordPunctTokenizer(变压器,HasInputCol,HasOutputCol,# 积分 https://stackoverflow.com/a/52467470# 来自 https://stackoverflow.com/users/234944/benjamin-mannsDefaultParamsReadable, DefaultParamsWritable):停用词 = Param(Params._dummy(), "停用词", "停用词",typeConverter=TypeConverters.toListString)@keyword_onlydef __init__(self, inputCol=None, outputCol=None, stopwords=None):super(NLTKWordPunctTokenizer, self).__init__()self.stopwords = Param(self, "stopwords", "")self._setDefault(停用词=[])kwargs = self._input_kwargsself.setParams(**kwargs)@keyword_onlydef setParams(self, inputCol=None, outputCol=None, stopwords=None):kwargs = self._input_kwargs返回 self._set(**kwargs)def setStopwords(self, value):返回 self._set(stopwords=list(value))def getStopwords(self):返回 self.getOrDefault(self.stopwords)# 在 Spark 中需要 >= 3.0def setInputCol(self, value):"""设置 :py:attr:`inputCol` 的值."""返回 self._set(inputCol=value)# 在 Spark 中需要 >= 3.0def setOutputCol(self, value):"""设置:py:attr:`outputCol` 的值."""返回 self._set(outputCol=value)def _transform(自我,数据集):停用词 = 设置(self.getStopwords())定义 f(s):令牌 = nltk.tokenize.wordpunct_tokenize(s)如果 t.lower() 不在停用词中,则返回 [t for t in tokens]t = ArrayType(StringType())out_col = self.getOutputCol()in_col = 数据集[self.getInputCol()]返回 dataset.withColumn(out_col, udf(f, t)(in_col))

示例用法(数据来自 ML - 功能):

sentenceDataFrame = spark.createDataFrame([(0,我听说过 Spark"),(0, "我希望 Java 可以使用案例类"),(1,逻辑回归模型很整洁")], ["标签", "句子"])tokenizer = NLTKWordPunctTokenizer(inputCol="sentence", outputCol="words",停用词=nltk.corpus.stopwords.words('english'))tokenizer.transform(sentenceDataFrame).show()

对于自定义 Python Estimator 参见 如何在 PySpark mllib 中滚动自定义估算器>

⚠ 这个答案取决于内部 API,并且兼容 Spark 2.0.3、2.1.1、2.2.0 或更高版本 (SPARK-19348).有关与先前 Spark 版本兼容的代码,请参阅修订版 8.

I am new to Spark SQL DataFrames and ML on them (PySpark). How can I create a custom tokenizer, which for example removes stop words and uses some libraries from ? Can I extend the default one?

解决方案

Can I extend the default one?

Not really. Default Tokenizer is a subclass of pyspark.ml.wrapper.JavaTransformer and, same as other transfromers and estimators from pyspark.ml.feature, delegates actual processing to its Scala counterpart. Since you want to use Python you should extend pyspark.ml.pipeline.Transformer directly.

import nltk

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0 
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class NLTKWordPunctTokenizer(
        Transformer, HasInputCol, HasOutputCol,
        # Credits https://stackoverflow.com/a/52467470
        # by https://stackoverflow.com/users/234944/benjamin-manns
        DefaultParamsReadable, DefaultParamsWritable):

    stopwords = Param(Params._dummy(), "stopwords", "stopwords",
                      typeConverter=TypeConverters.toListString)


    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(NLTKWordPunctTokenizer, self).__init__()
        self.stopwords = Param(self, "stopwords", "")
        self._setDefault(stopwords=[])
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, stopwords=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setStopwords(self, value):
        return self._set(stopwords=list(value))

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        stopwords = set(self.getStopwords())

        def f(s):
            tokens = nltk.tokenize.wordpunct_tokenize(s)
            return [t for t in tokens if t.lower() not in stopwords]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

Example usage (data from ML - Features):

sentenceDataFrame = spark.createDataFrame([
  (0, "Hi I heard about Spark"),
  (0, "I wish Java could use case classes"),
  (1, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = NLTKWordPunctTokenizer(
    inputCol="sentence", outputCol="words",  
    stopwords=nltk.corpus.stopwords.words('english'))

tokenizer.transform(sentenceDataFrame).show()

For custom Python Estimator see How to Roll a Custom Estimator in PySpark mllib

⚠ This answer depends on internal API and is compatible with Spark 2.0.3, 2.1.1, 2.2.0 or later (SPARK-19348). For code compatible with previous Spark versions please see revision 8.

这篇关于在 PySpark ML 中创建自定义转换器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆