如何还原一个自定义估计在PySpark mllib [英] How to Roll a Custom Estimator in PySpark mllib

查看:920
本文介绍了如何还原一个自定义估计在PySpark mllib的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图建立pySpark mllib一个简单的自定义估算。我有<一个href=\"http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml/32337101?noredirect=1#comment62052435_32337101\">here它可以编写自定义的变压器,但我不知道如何做到这一点上的估计。我也搞不懂是什么 @keyword_only 做和为什么需要这么多的getter和setter方法​​。 Scikit学习似乎对定制机型正确的文件(看到这里但pySpark没有。

一个例子模型伪code:

 类NormalDeviation():
    高清__init __(自我,阈值= 3):
    适合高清(X,Y =无):
       self.model = {'的意思是':x.mean(),性病:x.std()]
    DEF predict(X):
       返回((X-self.model ['平均'])GT; self.threshold * self.model ['STD'])
    高清decision_function(X):#做ML-LIB的支持呢?


解决方案

一般来说,没有文档,因为作为星火1.6 / 2.0的大部分相关的API是不是要公开。它应该在星火2.1.0(见 SPARK-7146 )更改。

API是比较复杂的,因为它必须遵循特定的惯例,以使给定的变压器估算兼容管道 API。有些方法可能需要像阅读和写作或网格搜索功能。其他,如 keyword_only 只是一个简单的帮手,而不是严格的要求。

假设你已经定义了以下组合插件均值参数:

 从pyspark.ml.pipeline进口估计,模型,管道
从pyspark.ml.param.shared进口*
从pyspark.sql.functions进口平均,STDDEV_SAMP
类HasMean(PARAMS):    平均=参数(Params._dummy(),中庸,中庸,
        类型转换器= TypeConverters.toFloat)    高清__init __(个体经营):
        超(HasMean,个体经营).__的init __()    高清setMean(个体经营,价值):
        返回self._set(平均数值)    高清getMean(个体经营):
        返回self.getOrDefault(self.mean)

标准偏差参数:

 类HasStandardDeviation(PARAMS):    STDDEV =参数(Params._dummy(),STDDEV,STDDEV,
        类型转换器= TypeConverters.toFloat)    高清__init __(个体经营):
        超(HasStandardDeviation,个体经营).__的init __()    高清setStddev(个体经营,价值):
        返回self._set(STDDEV =值)    高清getStddev(个体经营):
        返回self.getOrDefault(self.stddev)

和阈值:

 类HasCenteredThreshold(PARAMS):    centered_threshold =参数(Params._dummy()
            centered_threshold,centered_threshold,
            类型转换器= TypeConverters.toFloat)    高清__init __(个体经营):
        超(HasCenteredThreshold,个体经营).__的init __()    高清setCenteredThreshold(个体经营,价值):
        返回self._set(centered_threshold =值)    高清getCenteredThreshold(个体经营):
        返回self.getOrDefault(self.centered_threshold)

您可以创建基本估算如下:

 类NormalDeviation(估算,HasInputCol,
        有predictionCol,HasCenteredThreshold):    高清_fit(个体经营,数据集):
        C = self.getInputCol()
        亩,标准差= dataset.agg(AVG(C),STDDEV_SAMP(C))。第一个()
        回报(NormalDeviationModel()
            .setInputCol(三)
            .setMean(MU)
            .setStddev(西格马)
            .setCenteredThreshold(self.getCenteredThreshold())
            .SET predictionCol(self.get predictionCol()))类NormalDeviationModel(型号,HasInputCol,拥有predictionCol,
        HasMean,HasStandardDeviation,HasCenteredThreshold):    高清_transform(个体经营,数据集):
        X = self.getInputCol()
        Y = self.get predictionCol()
        阈值= self.getCenteredThreshold()
        亩= self.getMean()
        六西格玛= self.getStddev()        返回dataset.withColumn(Y,(数据集[X] - 亩)&GT;门槛*西格玛)

最后,可以使用如下:

  DF = sc.parallelize([(1,2.0),(2,3.0),(3,0.0),(4,99.0)])。toDF([身份证, X])normal_deviation = NormalDeviation()。setInputCol(×)。setCenteredThreshold(1.0)
模型=管线(阶段= [normal_deviation])。拟合(DF)model.transform(DF).show()
## + --- + ---- + ---------- +
## | ID | X | prediction |
## + --- + ---- + ---------- +
## | 1 | 2.0 |假|
## | 2 | 3.0 |假|
## | 3 | 0.0 |假|
## | 4 | 99.0 |真|
## + --- + ---- + ---------- +

I am trying to build a simple custom Estimator in pySpark mllib. I have here that it is possible to write a custom Transformer but I am not sure how to do it on an Estimator. I also don't understand what @keyword_only does and why do I need so many setters and getters. Scikit-learn seem to have a proper document for custom models (see here but pySpark doesn't.

Pseudo code of an example model:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?

解决方案

Generally speaking there is no documentation because as for Spark 1.6 / 2.0 most of the related API is not intended to be public. It should change in Spark 2.1.0 (see SPARK-7146).

API is relatively complex because it has to follow specific conventions in order to make given Transformer or Estimator compatible with Pipeline API. Some of these methods may be required for features like reading and writing or grid search. Other, like keyword_only are just a simple helpers and not strictly required.

Assuming you have defined following mix-ins for mean parameter:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

standard deviation parameter:

class HasStandardDeviation(Params):

    stddev = Param(Params._dummy(), "stddev", "stddev", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(stddev=value)

    def getStddev(self):
        return self.getOrDefault(self.stddev)

and threshold:

class HasCenteredThreshold(Params):

    centered_threshold = Param(Params._dummy(),
            "centered_threshold", "centered_threshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centered_threshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centered_threshold)

you could create basic Estimator as follows:

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold):

    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return (NormalDeviationModel()
            .setInputCol(c)
            .setMean(mu)
            .setStddev(sigma)
            .setCenteredThreshold(self.getCenteredThreshold())
            .setPredictionCol(self.getPredictionCol()))

class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold):

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)

Finally it could be used as follows:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+

这篇关于如何还原一个自定义估计在PySpark mllib的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆