如何使用scala在Spark中使用DataSet? [英] How to work with DataSet in Spark using scala?

查看:50
本文介绍了如何使用scala在Spark中使用DataSet?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 DataFrame 加载我的 CSV,然后我转换为 DataSet,但它的显示是这样的

I load my CSV using DataFrame then I converted to DataSet but it's shows like this

此行有多个标记:
- 无法找到存储在数据集中的类型的编码器.导入支持原始类型(Int、String 等)和产品类型(案例类)
spark.implicits._ 将在未来版本中添加对序列化其他类型的支持.
- 方法参数不足:(隐式证据$2:
org.apache.spark.sql.Encoder[DataSet.spark.aacsv])org.apache.spark.sql.Dataset[DataSet.spark.aacsv].未指定值参数证据$2

Multiple markers at this line:
- Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing
spark.implicits._ Support for serializing other types will be added in future releases.
- not enough arguments for method as: (implicit evidence$2:
org.apache.spark.sql.Encoder[DataSet.spark.aacsv])org.apache.spark.sql.Dataset[DataSet.spark.aacsv]. Unspecified value parameter evidence$2

如何解决这个问题?我的代码是 -

How to resolve this?. My code is -

case class aaCSV(
    a: String, 
    b: String 
    )

object WorkShop {

  def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val customSchema = StructType(Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true)))

    val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv") 
    df.printSchema()
    df.show()
    val googleDS = df.as[aaCSV]
    googleDS.show()

  }

}

现在我像这样改变了主要功能 -

Now I changed main function like this -

def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
   val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
    sa.printSchema()
    sa.show()
}

但它抛出错误 - 线程main"org.apache.spark.sql.AnalysisException 中的异常:无法解析 'Adj_Close' 给定的输入列:[_c1, _c2, _c5,_c4, _c6, _c3, _c0];第 1 行 pos 7.我该怎么办?

But it throws error - Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'Adj_Close' given input columns: [_c1, _c2, _c5, _c4, _c6, _c3, _c0]; line 1 pos 7. What should i do ?

现在我使用 spark 调度程序根据给定的时间间隔执行我的方法.但我参考这个链接 - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application.请帮助我们.

Now I execute my method using based on given time interval using spark scheduler. But I refer this link - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. Kindly help us.

推荐答案

您的 csv 文件中是否有标题(列名)?如果是,请尝试添加.option("header","true") 在读取语句中.例子:sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV].

Do you have header (column names) in your csv files ? If yes, try adding .option("header","true") in the read statement. Example: sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV].

下面的博客有不同的数据帧和数据集示例:http://technippet.blogspot.in/2016/10/different-ways-of-creating.html

The below blog has different examples for Dataframes and Dataset:http://technippet.blogspot.in/2016/10/different-ways-of-creating.html

这篇关于如何使用scala在Spark中使用DataSet?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆