在 PySpark ML 中创建自定义转换器 [英] Create a custom Transformer in PySpark ML
问题描述
我是 Spark SQL DataFrames 和 ML (PySpark) 的新手.如何创建自定义标记器,例如删除停用词并使用 nltk?我可以扩展默认的吗?
我可以扩展默认的吗?
不是真的.默认 Tokenizer
是 pyspark.ml.wrapper.JavaTransformer
的子类,并且与 pyspark.ml.feature
中的其他转换器和估计器相同,委托对其 Scala 对应物的实际处理.由于您想使用 Python,您应该直接扩展 pyspark.ml.pipeline.Transformer
.
import nltkfrom pyspark import keyword_only ## <2.0 ->pyspark.ml.util.keyword_only从 pyspark.ml 导入变压器从 pyspark.ml.param.shared 导入 HasInputCol、HasOutputCol、Param、Params、TypeConverters# 在 PySpark 中可用 >= 2.3.0从 pyspark.ml.util 导入 DefaultParamsReadable, DefaultParamsWritable从 pyspark.sql.functions 导入 udf从 pyspark.sql.types 导入 ArrayType、StringType类 NLTKWordPunctTokenizer(变压器,HasInputCol,HasOutputCol,# 积分 https://stackoverflow.com/a/52467470# 来自 https://stackoverflow.com/users/234944/benjamin-mannsDefaultParamsReadable, DefaultParamsWritable):停用词 = Param(Params._dummy(), "停用词", "停用词",typeConverter=TypeConverters.toListString)@keyword_onlydef __init__(self, inputCol=None, outputCol=None, stopwords=None):super(NLTKWordPunctTokenizer, self).__init__()self.stopwords = Param(self, "stopwords", "")self._setDefault(停用词=[])kwargs = self._input_kwargsself.setParams(**kwargs)@keyword_onlydef setParams(self, inputCol=None, outputCol=None, stopwords=None):kwargs = self._input_kwargs返回 self._set(**kwargs)def setStopwords(self, value):返回 self._set(stopwords=list(value))def getStopwords(self):返回 self.getOrDefault(self.stopwords)# 在 Spark 中需要 >= 3.0def setInputCol(self, value):"""设置 :py:attr:`inputCol` 的值."""返回 self._set(inputCol=value)# 在 Spark 中需要 >= 3.0def setOutputCol(self, value):"""设置:py:attr:`outputCol` 的值."""返回 self._set(outputCol=value)def _transform(自我,数据集):停用词 = 设置(self.getStopwords())定义 f(s):令牌 = nltk.tokenize.wordpunct_tokenize(s)如果 t.lower() 不在停用词中,则返回 [t for t in tokens]t = ArrayType(StringType())out_col = self.getOutputCol()in_col = 数据集[self.getInputCol()]返回 dataset.withColumn(out_col, udf(f, t)(in_col))
示例用法(数据来自 ML - 功能):
sentenceDataFrame = spark.createDataFrame([(0,我听说过 Spark"),(0, "我希望 Java 可以使用案例类"),(1,逻辑回归模型很整洁")], ["标签", "句子"])tokenizer = NLTKWordPunctTokenizer(inputCol="sentence", outputCol="words",停用词=nltk.corpus.stopwords.words('english'))tokenizer.transform(sentenceDataFrame).show()
对于自定义 Python Estimator
参见 如何在 PySpark mllib 中滚动自定义估算器>
⚠ 这个答案取决于内部 API,并且兼容 Spark 2.0.3、2.1.1、2.2.0 或更高版本 (SPARK-19348).有关与先前 Spark 版本兼容的代码,请参阅修订版 8.
I am new to Spark SQL DataFrames and ML on them (PySpark). How can I create a custom tokenizer, which for example removes stop words and uses some libraries from nltk? Can I extend the default one?
Can I extend the default one?
Not really. Default Tokenizer
is a subclass of pyspark.ml.wrapper.JavaTransformer
and, same as other transfromers and estimators from pyspark.ml.feature
, delegates actual processing to its Scala counterpart. Since you want to use Python you should extend pyspark.ml.pipeline.Transformer
directly.
import nltk
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
class NLTKWordPunctTokenizer(
Transformer, HasInputCol, HasOutputCol,
# Credits https://stackoverflow.com/a/52467470
# by https://stackoverflow.com/users/234944/benjamin-manns
DefaultParamsReadable, DefaultParamsWritable):
stopwords = Param(Params._dummy(), "stopwords", "stopwords",
typeConverter=TypeConverters.toListString)
@keyword_only
def __init__(self, inputCol=None, outputCol=None, stopwords=None):
super(NLTKWordPunctTokenizer, self).__init__()
self.stopwords = Param(self, "stopwords", "")
self._setDefault(stopwords=[])
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, stopwords=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def setStopwords(self, value):
return self._set(stopwords=list(value))
def getStopwords(self):
return self.getOrDefault(self.stopwords)
# Required in Spark >= 3.0
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self._set(inputCol=value)
# Required in Spark >= 3.0
def setOutputCol(self, value):
"""
Sets the value of :py:attr:`outputCol`.
"""
return self._set(outputCol=value)
def _transform(self, dataset):
stopwords = set(self.getStopwords())
def f(s):
tokens = nltk.tokenize.wordpunct_tokenize(s)
return [t for t in tokens if t.lower() not in stopwords]
t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
Example usage (data from ML - Features):
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = NLTKWordPunctTokenizer(
inputCol="sentence", outputCol="words",
stopwords=nltk.corpus.stopwords.words('english'))
tokenizer.transform(sentenceDataFrame).show()
For custom Python Estimator
see How to Roll a Custom Estimator in PySpark mllib
⚠ This answer depends on internal API and is compatible with Spark 2.0.3, 2.1.1, 2.2.0 or later (SPARK-19348). For code compatible with previous Spark versions please see revision 8.
这篇关于在 PySpark ML 中创建自定义转换器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!