Spark.ml 回归不计算与 scikit-learn 相同的模型 [英] Spark.ml regressions do not calculate same models as scikit-learn

查看:32
本文介绍了Spark.ml 回归不计算与 scikit-learn 相同的模型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 scikit-learn 和 spark.ml 中设置了一个非常简单的逻辑回归问题,结果出现分歧:他们学习的模型不同,但我想不通为什么(数据相同,模型类型是一样的,正则化是一样的......).

I am setting up a very simple logistic regression problem in scikit-learn and in spark.ml, and the results diverge: the models they learn are different, but I can't figure out why (data is the same, model type is the same, regularization is the same...).

毫无疑问,我在一侧或另一侧缺少一些设置.哪个设置?我应该如何设置 scikit 或 spark.ml 以找到与其对应的模型相同的模型?

No doubt I am missing some setting on one side or the other. Which setting? How should I set up either scikit or spark.ml to find the same model as its counterpart?

我在下面给出了 sklearn 代码和 spark.ml 代码.两者都应该准备好剪切和粘贴并运行.

I give the sklearn code and spark.ml code below. Both should be ready to cut-and-paste and run.

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
    [-0.7306653538519616, 0.0],
    [0.6750417712898752, -0.4232874171873786],
    [0.1863463229359709, -0.8163423997075965],
    [-0.6719842051493347, 0.0],
    [0.9699938346531928, 0.0],
    [0.22759406190283604, 0.0],
    [0.9688721028330911, 0.0],
    [0.5993795346650845, 0.0],
    [0.9219423508390701, -0.8972778242305388],
    [0.7006904841584055, -0.5607635619919824]
])

y = np.array([
    0.0,
    1.0,
    1.0,
    0.0,
    1.0,
    1.0,
    1.0,
    0.0,
    0.0,
    0.0
])

m, n = X.shape

# Add intercept term to simulate inputs to GameEstimator
X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis]))

l = 0.3
e = LogisticRegression(
    fit_intercept=False,
    penalty='l2',
    C=1/l,
    max_iter=100,
    tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# => [[ 0.98662189  0.45571052 -0.23467255]]

# Linear regression is called Ridge in sklearn
e = Ridge(
    fit_intercept=False,
    alpha=l,
    max_iter=100,
    tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# =>[ 0.32155545  0.17904355  0.41222418]

spark.ml 代码:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.SQLContext

object TestSparkRegression {
  def main(args: Array[String]): Unit = {
    import org.apache.log4j.{Level, Logger}

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)

    val sparkTrainingData = new SQLContext(sc)
      .createDataFrame(Seq(
        LabeledPoint(0.0, Vectors.dense(-0.7306653538519616, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.6750417712898752, -0.4232874171873786)),
        LabeledPoint(1.0, Vectors.dense(0.1863463229359709, -0.8163423997075965)),
        LabeledPoint(0.0, Vectors.dense(-0.6719842051493347, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.22759406190283604, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)),
        LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)),
        LabeledPoint(0.0, Vectors.dense(0.9219423508390701, -0.8972778242305388)),
        LabeledPoint(0.0, Vectors.dense(0.7006904841584055, -0.5607635619919824))))
      .toDF("label", "features")

    val logisticModel = new LogisticRegression()
      .setRegParam(0.3)
      .setLabelCol("label")
      .setFeaturesCol("features")
      .fit(sparkTrainingData)

    println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
    // Spark logistic model coefficients: [0.5451588538376263,0.26740606573584713] Intercept: -0.13897955358689987

    val linearModel = new LinearRegression()
      .setRegParam(0.3)
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setSolver("l-bfgs")
      .fit(sparkTrainingData)

    println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
    // Spark linear model coefficients: [0.19852664861346023,0.11501200541407802] Intercept: 0.45464906876832323

    sc.stop()
  }
}

推荐答案

您需要做到以下几点:

  1. 首先标准化 python 和 spark 数据帧.Spark 内部默认使用标准化.请注意考虑两个包中标准缩放器实现中标准偏差公式的差异.

  1. Standardize both the python and spark dataframes first. Spark uses standardization by default internally. Take care to account for differences in standard deviation formulas in standardscaler implementations in both the packages.

对于逻辑回归,Spark 使用对数损失的平均值(分母是权重的总和,即所有权重为 1 时训练实例的数量),而 sklearn 使用对数损失的总和.在线性回归中,与 sklearn 不同,spark 在误差平方和项中使用 1/2n 因子.Spark 正则化需要相应地缩小 - 在此示例中,逻辑回归为 1/10 倍,线性回归为 1/20 倍.

For logistic regression, Spark uses average of log-loss (denominator being sum of weights, which is number of training instances when all weights are 1) whereas sklearn uses sum of log-loss. In linear regression, spark uses a 1/2n factor in the sum of squared errors term unlike sklearn. Spark regularization needs to be scaled down accordingly - 1/10 times for logistic regression, and 1/20 times for linear regression in this example.

Scikit 学习代码

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
    [-0.7306653538519616, 0.0],
    [0.6750417712898752, -0.4232874171873786],
    [0.1863463229359709, -0.8163423997075965],
    [-0.6719842051493347, 0.0],
    [0.9699938346531928, 0.0],
    [0.22759406190283604, 0.0],
    [0.9688721028330911, 0.0],
    [0.5993795346650845, 0.0],
    [0.9219423508390701, -0.8972778242305388],
    [0.7006904841584055, -0.5607635619919824]
])

y = np.array([
    0.0,
    1.0,
    1.0,
    0.0,
    1.0,
    1.0,
    1.0,
    0.0,
    0.0,
    0.0
])

m, n = X.shape


from sklearn.preprocessing import StandardScaler

## sqrt(n-1)/sqrt(n) factor for getting the same standardization as spark
Xsc=StandardScaler().fit_transform(X)*3.0/np.sqrt(10.0)

l = 0.3
e = LogisticRegression(
    fit_intercept=True,
    penalty='l2',
    C=1/l,
    max_iter=100,
    tol=1e-11,
    solver='lbfgs',verbose=1)

e.fit(Xsc, y)

print e.coef_, e.intercept_
# => [[ 0.82122437 0.32615256]] [-0.01181534]

#e.get_params(deep=True)

# Linear regression is called Ridge in sklearn
e = Ridge(
    fit_intercept=True,
    alpha=l,
    max_iter=100,
    tol=1e-11)

e.fit(Xsc, y)

print e.coef_,e.intercept_
# =>[ 0.21310109 0.09203616] 0.5

Spark 代码(重构为使用 ML API)

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.StandardScaler

val sparkTrainingData_orig = new SQLContext(sc).
  createDataFrame(Seq(
    (0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
    (1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
    (1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
    (0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
    (1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
    (1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
    (1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
    (0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
    (0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
    (0.0, Vectors.dense(Array(0.7006904841584055, -0.5607635619919824))))).
  toDF("label", "features_orig")

val sparkTrainingData=new StandardScaler().
  setWithMean(true).
  setInputCol("features_orig").
  setOutputCol("features").
  fit(sparkTrainingData_orig).
  transform(sparkTrainingData_orig)

//Make regularization 0.3/10=0.03
val logisticModel = new LogisticRegression().
  setRegParam(0.03).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients: [0.8212244419577079,0.32615245441495727] Intercept: -0.011815325216668142

//Make regularization 0.3/20=0.015    
val linearModel = new LinearRegression().
  setRegParam(0.015).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
// Spark linear model coefficients: [0.21394341729353747,0.09257340293212045] Intercept: 0.5

这篇关于Spark.ml 回归不计算与 scikit-learn 相同的模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆