toDF的值不是org.apache.spark.rdd.RDD的成员[天气] [英] value toDF is not a member of org.apache.spark.rdd.RDD[Weather]

查看:52
本文介绍了toDF的值不是org.apache.spark.rdd.RDD的成员[天气]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此代码在spark-shell中是正常的,但是在Intellj IDE中是异常的.

this code is normal in spark-shell, but it is abnomal in Intellj IDE.

这是错误消息.错误:(59,7)值toDF不是org.apache.spark.rdd.RDD的成员[天气]可能的原因:可能在'value toDF'之前缺少了分号?} .toDF()

this is Error message. Error:(59, 7) value toDF is not a member of org.apache.spark.rdd.RDD[Weather] possible cause: maybe a semicolon is missing before `value toDF'? }.toDF()

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.ParamGridBuilder

import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.sql.DataFrame

case class Weather(
                date: String,
                day_of_week: String,
                avg_temp: Double,
                max_temp: Double,
                min_temp: Double,
                rainfall: Double,
                daylight_hours: Double,
                max_depth_snowfall: Double,
                total_snowfall: Double,
                solar_radiation: Double,
                mean_wind_speed: Double,
                max_wind_speed: Double,
                max_instantaneous_wind_speed: Double,
                avg_humidity: Double,
                avg_cloud_cover: Double)

case class Tracffic(date: String, down: Double, up: Double)
case class Predict(describe: String, avg_temp: Double, rainfall: Double, weekend: Double, total_snowfall: Double)

object weather2 {
  def main(args : Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("weather2")
val sc = new SparkContext(conf)

val weatherCSVTmp = sc.textFile("D:\\shared\\weather.csv")
val weatherHeader = sc.parallelize(Array(weatherCSVTmp.first))
val weatherCSV = weatherCSVTmp.subtract(weatherHeader)
val weatherDF = weatherCSV.map(_.split(",")).map { p =>
  Weather(p(0),
    p(1),
    p(2).trim.toDouble,
    p(3).trim.toDouble,
    p(4).trim.toDouble,
    p(5).trim.toDouble,
    p(6).trim.toDouble,
    p(7).trim.toDouble,
    p(8).trim.toDouble,
    p(9).trim.toDouble,
    p(10).trim.toDouble,
    p(11).trim.toDouble,
    p(12).trim.toDouble,
    p(13).trim.toDouble,
    p(14).trim.toDouble)
}.toDF()//error

val tracfficCSVTmp = sc.textFile("D:\\shared\\tracffic_volume.csv")
val tracfficHeader = sc.parallelize(Array(tracfficCSVTmp.first))
val tracfficCSV = tracfficCSVTmp.subtract(tracfficHeader)
val tracfficDF = tracfficCSV.map(_.split(",")).map { p =>
  Tracffic(p(0),
    p(1).trim.toDouble,
    p(2).trim.toDouble)
}.toDF() //error

val tracfficAndWeatherDF = tracfficDF.join(weatherDF, "date")
val isWeekend = udf((t: String) =>
  t match {
    case x if x.contains("Sunday") => 1d
    case x if x.contains("Saturday") => 1d
    case _ => 0d
  })
val replacedtracfficAndWeatherDF = tracfficAndWeatherDF.withColumn(
  "weekend", isWeekend(tracfficAndWeatherDF("day_of_week"))
).drop("day_of_week")
val va = new VectorAssembler().setInputCols {
  Array("avg_temp", "weekend", "rainfall")
}.setOutputCol("input_vec")

val scaler = new StandardScaler().setInputCol(va.getOutputCol).setOutputCol("scaled_vec")
va.explainParams
scaler.explainParams

//down predict
val lr = new LinearRegression().setMaxIter(10).setFeaturesCol(scaler.getOutputCol).setLabelCol("down")
val pipeline = new Pipeline().setStages(Array(va, scaler, lr))
val pipelineModel = pipeline.fit(replacedtracfficAndWeatherDF)
val test = sc.parallelize(Seq(
  Predict("Ussally Day", 20.0, 20, 0, 0),
  Predict("Weekend", 20.0, 20, 1, 0),
  Predict("Cold day", 3.0, 20, 0, 20)
)).toDF //error
val predictedDataDF = pipelineModel.transform(test)
val desAndPred = predictedDataDF.select("describe", "prediction").collect()
desAndPred.foreach {
  case Row(describe: String, prediction: Double) =>
    println(s"($describe) -> prediction = $prediction")
}

出什么问题了?库是spark2.11.x.你能帮我吗?

what is the problem? Libraries is spark 2.11.x. would you help me?

推荐答案

添加以下代码并尝试

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

这篇关于toDF的值不是org.apache.spark.rdd.RDD的成员[天气]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆