toDF的值不是org.apache.spark.rdd.RDD [(Long,org.apache.spark.ml.linalg.Vector)]的成员 [英] value toDF is not a member of org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)]

查看:80
本文介绍了toDF的值不是org.apache.spark.rdd.RDD [(Long,org.apache.spark.ml.linalg.Vector)]的成员的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在SPARK 2.0中使用SCALA将LDA之前的转换转换为数据帧时,出现编译错误.引发错误的特定代码如下:

Am getting a compilation error converting the pre-LDA transformation to a data frame using SCALA in SPARK 2.0. The specific code that is throwing an error is as per below:

val documents = PreLDAmodel.transform(mp_listing_lda_df)
  .select("docId","features")
  .rdd
  .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) }
  .toDF()

完整的编译错误是:

Error:(132, 8) value toDF is not a member of org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)]
possible cause: maybe a semicolon is missing before `value toDF'?
      .toDF()

这是完整的代码:

import java.io.FileInputStream
import java.sql.{DriverManager, ResultSet}
import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{LDA => oldLDA}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object MPClassificationLDA {
  /*Start: Configuration variable initialization*/
  val props = new Properties
  val fileStream = new FileInputStream("U:\\JIRA\\MP_Classification\\target\\classes\\mpclassification.properties")
  props.load(fileStream)
  val mpExtract = props.getProperty("mpExtract").toString
  val shard6_db_server_name = props.getProperty("shard6_db_server_name").toString
  val shard6_db_user_id = props.getProperty("shard6_db_user_id").toString
  val shard6_db_user_pwd = props.getProperty("shard6_db_user_pwd").toString
  val mp_output_file = props.getProperty("mp_output_file").toString
  val spark_warehouse_path = props.getProperty("spark_warehouse_path").toString
  val rf_model_file_path = props.getProperty("rf_model_file_path").toString
  val windows_hadoop_home = props.getProperty("windows_hadoop_home").toString
  val lda_vocabulary_size = props.getProperty("lda_vocabulary_size").toInt
  val pre_lda_model_file_path = props.getProperty("pre_lda_model_file_path").toString
  val lda_model_file_path = props.getProperty("lda_model_file_path").toString
  fileStream.close()
  /*End: Configuration variable initialization*/

  val conf = new SparkConf().set("spark.sql.warehouse.dir", spark_warehouse_path)

  def main(arg: Array[String]): Unit = {
    //SQL Query definition and parameter values as parameter upon executing the Object
    val cont_id = "14211599"
    val top = "100000"
    val start_date = "2016-05-01"
    val end_date = "2016-06-01"

    val mp_spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("MPClassificationLoadLDA")
      .config(conf)
      .getOrCreate()
    MPClassificationLDACalculation(mp_spark, cont_id, top, start_date, end_date)
    mp_spark.stop()
  }

  private def MPClassificationLDACalculation
  (mp_spark: SparkSession
   ,cont_id: String
   ,top: String
   ,start_date: String
   ,end_date: String
  ): Unit = {

    //DB connection definition
    def createConnection() = {
      Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance();
      DriverManager.getConnection("jdbc:sqlserver://" + shard6_db_server_name + ";user=" + shard6_db_user_id + ";password=" + shard6_db_user_pwd);
    }

    //DB Field Names definition
    def extractvalues(r: ResultSet) = {
      Row(r.getString(1),r.getString(2))
    }

    //Prepare SQL Statement with parameter value replacement
    val query = """SELECT docId = audt_id, text = auction_title FROM brands6.dbo.uf_ds_marketplace_classification_listing(@cont_id, @top, '@start_date', '@end_date') WHERE ? < ? OPTION(RECOMPILE);"""
      .replaceAll("@cont_id", cont_id)
      .replaceAll("@top", top)
      .replaceAll("@start_date", start_date)
      .replaceAll("@end_date", end_date)
      .stripMargin

    //Connect to Source DB and execute the Prepared SQL Steatement
    val mpDataRDD = new JdbcRDD(mp_spark.sparkContext
      ,createConnection
      ,query
      ,lowerBound = 0
      ,upperBound = 10000000
      ,numPartitions = 1
      ,mapRow = extractvalues)

    val schema_string = "docId,text"
    val fields = StructType(schema_string.split(",")
      .map(fieldname => StructField(fieldname, StringType, true)))

    //Create Data Frame using format identified through schema_string
    val mpDF = mp_spark.createDataFrame(mpDataRDD, fields)
    mpDF.collect()

    val mp_listing_tmp = mpDF.selectExpr("cast(docId as long) docId", "text")
    mp_listing_tmp.printSchema()
    println(mp_listing_tmp.first)

    val mp_listing_lda_df = mp_listing_tmp.withColumn("docId", mp_listing_tmp("docId"))
    mp_listing_lda_df.printSchema()

    val tokenizer = new RegexTokenizer()
      .setInputCol("text")
      .setOutputCol("rawTokens")
      .setMinTokenLength(2)

    val stopWordsRemover = new StopWordsRemover()
      .setInputCol("rawTokens")
      .setOutputCol("tokens")

    val vocabSize = 4000

    val countVectorizer = new CountVectorizer()
      .setVocabSize(vocabSize)
      .setInputCol("tokens")
      .setOutputCol("features")

    val PreLDApipeline = new Pipeline()
      .setStages(Array(tokenizer, stopWordsRemover, countVectorizer))

    val PreLDAmodel = PreLDApipeline.fit(mp_listing_lda_df)
    //comment out after saving it the first time
    PreLDAmodel.write.overwrite().save(pre_lda_model_file_path)

    val documents = PreLDAmodel.transform(mp_listing_lda_df)
      .select("docId","features")
      .rdd
      .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) }
      .toDF()

    //documents.printSchema()
    val numTopics: Int = 20
    val maxIterations: Int = 100

    //note the FeaturesCol need to be set
    val lda = new LDA()
      .setOptimizer("em")
      .setK(numTopics)
      .setMaxIter(maxIterations)
      .setFeaturesCol(("_2"))

    val vocabArray = PreLDAmodel.stages(2).asInstanceOf[CountVectorizerModel].vocabulary
  }
}

我认为这与代码的imports部分中的冲突有关.感谢任何帮助.

Am thinking that it is related to conflicts in the imports section of the code. Appreciate any help.

推荐答案

需要完成2件事:

导入隐式:请注意,只有在创建 org.apache.spark.sql.SQLContext 的实例之后,才应执行此操作.它应写为:

Import implicits: Note that this should be done only after an instance of org.apache.spark.sql.SQLContext is created. It should be written as:

val sqlContext= new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

将案例类移到方法之外:通过案例类,您可以使用它定义DataFrame的架构,应在需要它的方法之外定义.您可以在此处了解更多信息: https://issues.scala-lang.org/浏览/SI-6649

Move case class outside of the method: case class, by use of which you define the schema of the DataFrame, should be defined outside of the method needing it. You can read more about it here: https://issues.scala-lang.org/browse/SI-6649

这篇关于toDF的值不是org.apache.spark.rdd.RDD [(Long,org.apache.spark.ml.linalg.Vector)]的成员的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆