Apache的火花机学习 - 不能得到估算例如工作 [英] Apache Spark Machine Learning - can't get Estimator example to work

查看:536
本文介绍了Apache的火花机学习 - 不能得到估算例如工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有困难采取任何的例子机器学习code从星火文档,实际上让他们为Java程序的运行。无论是我有限的Java,Maven的,火花(或最有可能的三个)的知识,我无法找到一个有用的解释。

把这个例如。尝试并获得这个工作我已经使用了以下的项目结构

 
├──的pom.xml
└──SRC
    └──主
        └──的java
            └──SimpleEstimator.java

的Java 文件看起来像这样

进口java.util.Arrays中;
进口的java.util.List;进口org.apache.spark.ml.classification.LogisticRegressionModel;
进口org.apache.spark.ml.param.ParamMap;
进口org.apache.spark.ml.classification.LogisticRegression;
进口org.apache.spark.mllib.linalg.Vectors;
进口org.apache.spark.mllib.regression.LabeledPoint;
进口org.apache.spark.sql.DataFrame;
进口org.apache.spark.sql.Row;
公共类SimpleEstimator {
  公共静态无效的主要(字串[] args){
    数据框培训= sqlContext.createDataFrame(Arrays.asList(
      新LabeledPoint(1.0,Vectors.dense(0.0,1.1,0.1)),
      新LabeledPoint(0.0,Vectors.dense(2.0,1.0,-1.0)),
      新LabeledPoint(0.0,Vectors.dense(2.0,1.3,1.0)),
      新LabeledPoint(1.0,Vectors.dense(0.0,1.2,-0.5))
    ),LabeledPoint.class);    逻辑回归LR =新逻辑回归();
    的System.out.println(逻辑回归参数:\\ n+ lr.explainParams()+\\ n);    lr.setMaxIter(10)
      .setRegParam(0.01);    LogisticRegressionModel MODEL1 = lr.fit(培训);    的System.out.println(使用参数型号1契合:+ model1.parent()extractParamMap());    ParamMap paramMap =新ParamMap()
      。把(lr.maxIter()。W(20))//指定1参数。
      。把(lr.maxIter(),30)//这将覆盖原来的MAXITER。
      。把(lr.regParam()W(0.1),lr.threshold()W(0.55)。); //指定多个PARAMS。    ParamMap paramMap2 =新ParamMap()
      。把(lr.probabilityCol()W(myProbability)。); //更改输出列名
    ParamMap paramMapCombined = paramMap $另加$加(paramMap2)。    LogisticRegressionModel MODEL2 = lr.fit(培训,paramMapCombined);
    的System.out.println(使用参数型号2契合:+ model2.parent()extractParamMap());    数据帧测试= sqlContext.createDataFrame(Arrays.asList(
      新LabeledPoint(1.0,Vectors.dense(-1.0,1.5,1.3)),
      新LabeledPoint(0.0,Vectors.dense(3.0,2.0,-​​0.1))
      新LabeledPoint(1.0,Vectors.dense(0.0,2.2,-1.5))
    ),LabeledPoint.class);
    数据帧的结果= model2.transform(测试);
    为(行r:results.select(特征,标签,myProbability,prediction)收集()){
      的System.out.println((+ r.get(0)+,+ r.get(1)+) - >概率=+ r.get(2)
          +,prediction =+ r.get(3));
    }
  }
}

POM 文件如下:

<项目>
  <&的groupId GT; edu.berkeley< /的groupId>
  <&的artifactId GT;简单估计< / artifactId的>
  < modelVersion> 4.0.0< / modelVersion>
  <名称>简单估算< /名称>
  <包装和GT;&罐子LT; /包装>
  <&版GT; 1.0 LT; /版本>
  <依赖和GT;
    <&依赖性GT;
      <&的groupId GT; org.apache.spark< /的groupId>
      <&的artifactId GT;火花core_2.11< / artifactId的>
      <&版GT; 1.5.0< /版本>
    < /依赖性>
    <&依赖性GT;
      <&的groupId GT; org.apache.spark< /的groupId>
      <&的artifactId GT;火花mllib_2.11< / artifactId的>
      <&版GT; 1.5.0< /版本>
    < /依赖性>
    <&依赖性GT;
      <&的groupId GT; org.apache.spark< /的groupId>
      <&的artifactId GT;火花sql_2.11< / artifactId的>
      <&版GT; 1.5.0< /版本>
    < /依赖性>
  < /依赖和GT;
< /项目>

如果我再运行 MVN包此目录的根,我得到了以下错误

[信息]扫描项目...
[信息]
[INFO] ----------------------------------------------- -------------------------
[INFO]大厦简单估算1.0
[INFO] ----------------------------------------------- -------------------------
[信息]
[信息] --- Maven的资源 - 插件:2.6:资源(缺省资源)@简单估算---
【警告】使用平台编码(UTF-8实际上)复制过滤资源,即构建依赖于平台!
[信息]跳到不存在的resourceDirectory /用户/菲利普/学习/火花/估计/ src目录/主/资源
[信息]
[信息] --- Maven的编译器插件:3.1:编译(默认编译)@简单估算---
检测[INFO]的变化 - 重新编译模块!
[警告]文件编码尚未设置,使用平台编码UTF-8,即构建依赖于平台!
[INFO]将源文件编译为/用户/菲利普/学习/火花/估计/目标/班
[INFO] ----------------------------------------------- --------------
[错误]编译错误:
[INFO] ----------------------------------------------- --------------
[错误] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[15,26]找不到符号
  符号:变量sqlContext
  位置:类SimpleEstimator
[错误] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[44,22]找不到符号
  符号:变量sqlContext
  位置:类SimpleEstimator
[INFO] 2个错误
[INFO] ----------------------------------------------- --------------
[INFO] ----------------------------------------------- -------------------------
[INFO]构建失败
[INFO] ----------------------------------------------- -------------------------
[INFO]总时间:1.567小号
[INFO]在表面处理:2015-09-16T16:54:20 + 01:00
[INFO]最后的内存:36M / 422M
[INFO] ----------------------------------------------- -------------------------
[错误]未能执行目标org.apache.maven.plugins:Maven的编译器插件:3.1:编译(默认编译)项目简单估算:编译失败:编译失败:
[错误] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[15,26]找不到符号
[错误]符号:变量sqlContext
[错误]位置:类SimpleEstimator
[错误] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[44,22]找不到符号
[错误]符号:变量sqlContext
[错误]位置:类SimpleEstimator
[错误] - > [求助1]
[错误]
[错误]要查看错误的完整堆栈跟踪,重新运行Maven与-e开关。
[错误]使用-X开关启用完全调试日志记录重新运行Maven。
[错误]
[错误]有关错误和可能的解决方案,请阅读以下文章了解更多信息:
[错误] [说明1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException


更新

由于@holden我确信添加这些行

//额外进口
导入org.apache.spark.api.java *。
进口org.apache.spark.SparkConf;
进口org.apache.spark.api.java.function.Function;
进口org.apache.spark.sql.SQLContext;//添加这些在课堂上线启动
SparkConf的conf =新SparkConf()setAppName(简单估算);
JavaSparkContext SC =新JavaSparkContext(CONF);
SQLContext sqlContext =新org.apache.spark.sql.SQLContext(SC);

其进展事情有点,但现在我得到以下错误

[错误]未能执行目标org.apache.maven.plugins:Maven的编译器插件:3.1:编译(默认编译)项目简单估算:编译失败
[错误] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[21,36]没有合适的方法发现createDataFrame(java.util.List<org.apache.spark.mllib.regression.LabeledPoint>,java.lang.Class<org.apache.spark.mllib.regression.LabeledPoint>)
[错误]的方法org.apache.spark.sql.SQLContext.<A>createDataFrame(org.apache.spark.rdd.RDD<A>,scala.reflect.api.TypeTags.TypeTag<A>)是不适用
[错误](无法推断类型变量(S)A号
[错误](参数不匹配; java.util.List的&LT; org.apache.spark.mllib.regression.LabeledPoint每个可不能转换为org.apache.spark.rdd.RDD&LT; A&GT;))
[错误]的方法org.apache.spark.sql.SQLContext.<A>createDataFrame(scala.collection.Seq<A>,scala.reflect.api.TypeTags.TypeTag<A>)是不适用
[错误](无法推断类型变量(S)A号
[错误](参数不匹配; java.util.List的&LT; org.apache.spark.mllib.regression.LabeledPoint每个可不能转换为scala.collection.Seq&LT; A&GT;))
[错误]的方法org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.rdd.RDD<org.apache.spark.sql.Row>,org.apache.spark.sql.types.StructType)是不适用
[错误](参数不匹配; java.util.List的&LT; org.apache.spark.mllib.regression.LabeledPoint每个可不能转换为org.apache.spark.rdd.RDD&LT; org.apache.spark.sql.Row&GT;)
[错误]的方法org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.api.java.JavaRDD<org.apache.spark.sql.Row>,org.apache.spark.sql.types.StructType)是不适用
[错误](参数不匹配; java.util.List的&LT; org.apache.spark.mllib.regression.LabeledPoint每个可不能转换为org.apache.spark.api.java.JavaRDD&LT; org.apache.spark.sql.Row&GT ;)
[错误]的方法org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.rdd.RDD<?>,java.lang.Class<?>)是不适用
[错误](参数不匹配; java.util.List的&LT; org.apache.spark.mllib.regression.LabeledPoint每个可不能转换为org.apache.spark.rdd.RDD&LT;&GT;)
[错误]的方法org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.api.java.JavaRDD<?>,java.lang.Class<?>)是不适用
[错误](参数不匹配; java.util.List的&LT; org.apache.spark.mllib.regression.LabeledPoint每个可不能转换为org.apache.spark.api.java.JavaRDD&LT;&GT;)

在code提到了误差是直接从这个例子

 数据框培训= sqlContext.createDataFrame(Arrays.asList(
      新LabeledPoint(1.0,Vectors.dense(0.0,1.1,0.1)),
      新LabeledPoint(0.0,Vectors.dense(2.0,1.0,-1.0)),
      新LabeledPoint(0.0,Vectors.dense(2.0,1.3,1.0)),
      新LabeledPoint(1.0,Vectors.dense(0.0,1.2,-0.5))
    ),LabeledPoint.class);


解决方案

在除了具有SQL和火花未定义为@holden提到的上下文中,Java比如你是指缺乏一个LabeledPoints列表转换为一个关键的一步RDD(见<一href=\"http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds\" rel=\"nofollow\">http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds获取更多信息)

要照顾这,你可以使用 sc.parallelize 方法从 JavaSparkContext 到列表转换成 JavaRDD 对象的 createDataFrame 方法需要作为参数。见下面片断。

 数据框培训= sqlContext.createDataFrame(sc.parallelize(
        Arrays.asList(
            新LabeledPoint(1.0,Vectors.dense(0.0,1.1,0.1)),
            新LabeledPoint(0.0,Vectors.dense(2.0,1.0,-1.0)),
            新LabeledPoint(0.0,Vectors.dense(2.0,1.3,1.0)),
            新LabeledPoint(1.0,Vectors.dense(0.0,1.2,-0.5))
        )
    ),LabeledPoint.class);

另外,你需要在你的code指定主URL,如果你是通过Maven的运行它独立。你可以简单地使用本地[2] 作为URL来运行2个线程局部的火花。

  SparkConf的conf =新SparkConf()
    .setMaster(本地[2])
    .setAppName(简单估算);

通常情况下,这会从环境中使用时提供给您的程序火花提交脚本。你可以跳过它,如果是这样的话。

最后,你可能希望从星火禁用详细日志消息符合从算法的输出要容易得多。你可以跳过这一步,因为它是可选的。

请参阅下面所有提及的修改包括完整的code。

 进口org.apache.spark.SparkConf;
进口org.apache.spark.api.java.JavaSparkContext;
进口org.apache.spark.ml.classification.LogisticRegression;
进口org.apache.spark.ml.classification.LogisticRegressionModel;
进口org.apache.spark.ml.param.ParamMap;
进口org.apache.spark.mllib.linalg.Vectors;
进口org.apache.spark.mllib.regression.LabeledPoint;
进口org.apache.spark.sql.DataFrame;
进口org.apache.spark.sql.Row;
进口org.apache.spark.sql.SQLContext;//附加进口静音SYS.ERR
进口java.io.IOException异常;
进口java.io.OutputStream中;
进口java.io.PrintStream中;进口java.util.Arrays中;公共类SimpleEstimator {
    公共静态无效的主要(字串[] args){
        为了提高可读性// MUTE日志消息(可选)
        System.setErr(新的PrintStream(新的OutputStream(){
            @覆盖
            公共无效写入(INT为arg0)抛出IOException
                //保持空
            }
        }));        //添加这些在课堂上线启动
        SparkConf的conf =新SparkConf()
                .setMaster(本地[2])
                .setAppName(简单估算);        JavaSparkContext SC =新JavaSparkContext(CONF);
        SQLContext sqlContext =新SQLContext(SC);        数据框培训= sqlContext.createDataFrame(sc.parallelize(
                Arrays.asList(
                        新LabeledPoint(1.0,Vectors.dense(0.0,1.1,0.1)),
                        新LabeledPoint(0.0,Vectors.dense(2.0,1.0,-1.0)),
                        新LabeledPoint(0.0,Vectors.dense(2.0,1.3,1.0)),
                        新LabeledPoint(1.0,Vectors.dense(0.0,1.2,-0.5))
                )
        ),LabeledPoint.class);        逻辑回归LR =新逻辑回归();
        的System.out.println(逻辑回归参数:\\ n+ lr.explainParams()+\\ n);        lr.setMaxIter(10)
                .setRegParam(0.01);        LogisticRegressionModel MODEL1 = lr.fit(培训);        的System.out.println(使用参数型号1契合:+ model1.parent()extractParamMap());        ParamMap paramMap =新ParamMap()
                。把(lr.maxIter()。W(20))//指定1参数。
                。把(lr.maxIter(),30)//这将覆盖原来的MAXITER。
                。把(lr.regParam()W(0.1),lr.threshold()W(0.55)。); //指定多个PARAMS。        ParamMap paramMap2 =新ParamMap()
                。把(lr.probabilityCol()W(myProbability)。); //更改输出列名
        ParamMap paramMapCombined = paramMap $另加$加(paramMap2)。        LogisticRegressionModel MODEL2 = lr.fit(培训,paramMapCombined);
        的System.out.println(使用参数型号2契合:+ model2.parent()extractParamMap());        数据帧测试= sqlContext.createDataFrame(sc.parallelize(
                Arrays.asList(
                        新LabeledPoint(1.0,Vectors.dense(-1.0,1.5,1.3)),
                        新LabeledPoint(0.0,Vectors.dense(3.0,2.0,-​​0.1))
                        新LabeledPoint(1.0,Vectors.dense(0.0,2.2,-1.5))
                )
        ),LabeledPoint.class);        数据帧的结果= model2.transform(测试);
        为(行r:results.select(特征,标签,myProbability,prediction)收集()){
            的System.out.println((+ r.get(0)+,+ r.get(1)+) - &GT;概率=+ r.get(2)
                    +,prediction =+ r.get(3));
        }
    }
}

I am having difficulty taking any of the example machine learning code from the Spark docs and actually getting them to run as Java programs. Whether it's my limited knowledge of Java, Maven, Spark (or most likely all three) I can't find a useful explanation.

Take this example. To try and get this working I have used the following project structure

.
├── pom.xml
└── src
    └── main
        └── java
            └── SimpleEstimator.java

The Java file looks like this

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;


public class SimpleEstimator {
  public static void main(String[] args) {
    DataFrame training = sqlContext.createDataFrame(Arrays.asList(
      new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
      new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
      new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
      new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
    ), LabeledPoint.class);

    LogisticRegression lr = new LogisticRegression();
    System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");

    lr.setMaxIter(10)
      .setRegParam(0.01);

    LogisticRegressionModel model1 = lr.fit(training);

    System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());

    ParamMap paramMap = new ParamMap()
      .put(lr.maxIter().w(20)) // Specify 1 Param.
      .put(lr.maxIter(), 30) // This overwrites the original maxIter.
      .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.

    ParamMap paramMap2 = new ParamMap()
      .put(lr.probabilityCol().w("myProbability")); // Change output column name
    ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);

    LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
    System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());

    DataFrame test = sqlContext.createDataFrame(Arrays.asList(
      new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
      new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
      new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))
    ), LabeledPoint.class);


    DataFrame results = model2.transform(test);
    for (Row r: results.select("features", "label", "myProbability", "prediction").collect()) {
      System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
          + ", prediction=" + r.get(3));
    }
  }
}

and the pom file as follows

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-estimator</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Estimator</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>1.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>1.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>1.5.0</version>
    </dependency>
  </dependencies>
</project>

If I then run mvn package from the root of this directory I get the following errors

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Simple Estimator 1.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ simple-estimator ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/philip/study/spark/estimator/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ simple-estimator ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /Users/philip/study/spark/estimator/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[15,26] cannot find symbol
  symbol:   variable sqlContext
  location: class SimpleEstimator
[ERROR] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[44,22] cannot find symbol
  symbol:   variable sqlContext
  location: class SimpleEstimator
[INFO] 2 errors
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.567 s
[INFO] Finished at: 2015-09-16T16:54:20+01:00
[INFO] Final Memory: 36M/422M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project simple-estimator: Compilation failure: Compilation failure:
[ERROR] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[15,26] cannot find symbol
[ERROR] symbol:   variable sqlContext
[ERROR] location: class SimpleEstimator
[ERROR] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[44,22] cannot find symbol
[ERROR] symbol:   variable sqlContext
[ERROR] location: class SimpleEstimator
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException


Update

Thanks to @holden I made sure to add these lines

// additional imports
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SQLContext;

// added these as starting lines in class
SparkConf conf = new SparkConf().setAppName("Simple Estimator");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

which progressed things a bit but now I get the following error

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project simple-estimator: Compilation failure
[ERROR] /Users/philip/study/spark/estimator/src/main/java/SimpleEstimator.java:[21,36] no suitable method found for createDataFrame(java.util.List<org.apache.spark.mllib.regression.LabeledPoint>,java.lang.Class<org.apache.spark.mllib.regression.LabeledPoint>)
[ERROR] method org.apache.spark.sql.SQLContext.<A>createDataFrame(org.apache.spark.rdd.RDD<A>,scala.reflect.api.TypeTags.TypeTag<A>) is not applicable
[ERROR] (cannot infer type-variable(s) A
[ERROR] (argument mismatch; java.util.List<org.apache.spark.mllib.regression.LabeledPoint> cannot be converted to org.apache.spark.rdd.RDD<A>))
[ERROR] method org.apache.spark.sql.SQLContext.<A>createDataFrame(scala.collection.Seq<A>,scala.reflect.api.TypeTags.TypeTag<A>) is not applicable
[ERROR] (cannot infer type-variable(s) A
[ERROR] (argument mismatch; java.util.List<org.apache.spark.mllib.regression.LabeledPoint> cannot be converted to scala.collection.Seq<A>))
[ERROR] method org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.rdd.RDD<org.apache.spark.sql.Row>,org.apache.spark.sql.types.StructType) is not applicable
[ERROR] (argument mismatch; java.util.List<org.apache.spark.mllib.regression.LabeledPoint> cannot be converted to org.apache.spark.rdd.RDD<org.apache.spark.sql.Row>)
[ERROR] method org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.api.java.JavaRDD<org.apache.spark.sql.Row>,org.apache.spark.sql.types.StructType) is not applicable
[ERROR] (argument mismatch; java.util.List<org.apache.spark.mllib.regression.LabeledPoint> cannot be converted to org.apache.spark.api.java.JavaRDD<org.apache.spark.sql.Row>)
[ERROR] method org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.rdd.RDD<?>,java.lang.Class<?>) is not applicable
[ERROR] (argument mismatch; java.util.List<org.apache.spark.mllib.regression.LabeledPoint> cannot be converted to org.apache.spark.rdd.RDD<?>)
[ERROR] method org.apache.spark.sql.SQLContext.createDataFrame(org.apache.spark.api.java.JavaRDD<?>,java.lang.Class<?>) is not applicable
[ERROR] (argument mismatch; java.util.List<org.apache.spark.mllib.regression.LabeledPoint> cannot be converted to org.apache.spark.api.java.JavaRDD<?>)

The code referred to by the error is straight from the example

DataFrame training = sqlContext.createDataFrame(Arrays.asList(
      new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
      new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
      new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
      new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
    ), LabeledPoint.class);

解决方案

In addition to having the sql and spark contexts undefined as @holden mentioned, the Java example you refer to lacks a crucial step that converts the list of LabeledPoints into an RDD (see http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds for more information)

To take care of this, you can use sc.parallelize method from JavaSparkContext to convert the list into a JavaRDD object that the createDataFrame method expects as a parameter. See below snippet.

DataFrame training = sqlContext.createDataFrame(sc.parallelize(
        Arrays.asList(
            new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
            new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
            new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
            new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
        )
    ), LabeledPoint.class);

Also, you need to specify Master URL in your code if you are running it standalone via maven. You may simply use local[2] as the URL to run Spark locally with 2 threads.

SparkConf conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("Simple Estimator");

Normally, this would be provided to your program from the environment when using the spark-submit script. You can skip it if that is the case.

Finally you may want to disable the verbose log messages from Spark to follow the output from the algorithm much easier. You may skip this step as it is optional.

See the complete code below with all the mentioned modifications included.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

//ADDITIONAL IMPORTS FOR MUTING SYS.ERR
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;

import java.util.Arrays;

public class SimpleEstimator {
    public static void main(String[] args) {
        //MUTE LOG MESSAGES  FOR READABILITY (OPTIONAL)
        System.setErr(new PrintStream(new OutputStream() {
            @Override
            public void write(int arg0) throws IOException {
                // keep empty
            }
        }));

        // added these as starting lines in class
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("Simple Estimator");

        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        DataFrame training = sqlContext.createDataFrame(sc.parallelize(
                Arrays.asList(
                        new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
                        new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
                        new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
                        new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
                )
        ), LabeledPoint.class);

        LogisticRegression lr = new LogisticRegression();
        System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");

        lr.setMaxIter(10)
                .setRegParam(0.01);

        LogisticRegressionModel model1 = lr.fit(training);

        System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());

        ParamMap paramMap = new ParamMap()
                .put(lr.maxIter().w(20)) // Specify 1 Param.
                .put(lr.maxIter(), 30) // This overwrites the original maxIter.
                .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.

        ParamMap paramMap2 = new ParamMap()
                .put(lr.probabilityCol().w("myProbability")); // Change output column name
        ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);

        LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
        System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());

        DataFrame test = sqlContext.createDataFrame(sc.parallelize(
                Arrays.asList(
                        new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
                        new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
                        new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))
                )
        ), LabeledPoint.class);

        DataFrame results = model2.transform(test);
        for (Row r: results.select("features", "label", "myProbability", "prediction").collect()) {
            System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
                    + ", prediction=" + r.get(3));
        }
    }
}

这篇关于Apache的火花机学习 - 不能得到估算例如工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆