如何在PySpark中创建自定义估算器 [英] How to create a custom Estimator in PySpark

查看:99
本文介绍了如何在PySpark中创建自定义估算器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在PySpark MLlib中构建一个简单的自定义Estimator.我在此处可以编写自定义的Transformer,但我不确定如何在Estimator上执行此操作.我也不明白@keyword_only的作用,为什么我需要这么多的设置方法和获取方法. Scikit-learn似乎有一个适用于自定义模型的文档(看到这里,但PySpark没有.

I am trying to build a simple custom Estimator in PySpark MLlib. I have here that it is possible to write a custom Transformer but I am not sure how to do it on an Estimator. I also don't understand what @keyword_only does and why do I need so many setters and getters. Scikit-learn seem to have a proper document for custom models (see here but PySpark doesn't.

示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?

推荐答案

通常来说,没有文档,因为对于Spark 1.6/2.0,大多数相关API都不打算公开.它应该在Spark 2.1.0中进行更改(请参见 SPARK-7146 ).

Generally speaking there is no documentation because as for Spark 1.6 / 2.0 most of the related API is not intended to be public. It should change in Spark 2.1.0 (see SPARK-7146).

API相对复杂,因为它必须遵循特定的约定才能使给定的TransformerEstimatorPipeline API兼容.这些功能中的某些可能是诸如读取和写入或网格搜索之类的功能所必需的.其他,例如keyword_only只是简单的帮助器,并非严格要求.

API is relatively complex because it has to follow specific conventions in order to make given Transformer or Estimator compatible with Pipeline API. Some of these methods may be required for features like reading and writing or grid search. Other, like keyword_only are just a simple helpers and not strictly required.

假设您为均值参数定义了以下混合项:

Assuming you have defined following mix-ins for mean parameter:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

标准偏差参数:

class HasStandardDeviation(Params):

    standardDeviation = Param(Params._dummy(),
        "standardDeviation", "standardDeviation", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(standardDeviation=value)

    def getStddev(self):
        return self.getOrDefault(self.standardDeviation)

和阈值:

class HasCenteredThreshold(Params):

    centeredThreshold = Param(Params._dummy(),
            "centeredThreshold", "centeredThreshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centeredThreshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centeredThreshold)

您可以如下创建基本的Estimator:

you could create basic Estimator as follows:

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
from pyspark import keyword_only  

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold,
        # Available in PySpark >= 2.3.0 
        # Credits https://stackoverflow.com/a/52467470
        # by https://stackoverflow.com/users/234944/benjamin-manns
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        super(NormalDeviation, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setPredictionCol(self, value):
        """
        Sets the value of :py:attr:`predictionCol`.
        """
        return self._set(predictionCol=value)

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        

    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return NormalDeviationModel(
            inputCol=c, mean=mu, standardDeviation=sigma, 
            centeredThreshold=self.getCenteredThreshold(),
            predictionCol=self.getPredictionCol())


class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        super(NormalDeviationModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)  

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)           

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)     

最后它可以按如下方式使用:

Finally it could be used as follows:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+

这篇关于如何在PySpark中创建自定义估算器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆