如何使用Scala在Spark中使用DataSet? [英] How to work with DataSet in Spark using scala?
问题描述
我使用DataFrame加载CSV,然后转换为DataSet,但显示如下
I load my CSV using DataFrame then I converted to DataSet but it's shows like this
此行有多个标记:
-无法找到存储在数据集中的类型的编码器.导入
支持基本类型(Int,String等)和产品类型(案例类)
spark.implicits._在将来的版本中将添加对序列化其他类型的支持.
-方法的参数不足,例如:(隐式证据$ 2:
org.apache.spark.sql.Encoder [DataSet.spark.aacsv])org.apache.spark.sql.Dataset [DataSet.spark.aacsv].未指定值参数证据$ 2
Multiple markers at this line:
- Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing
spark.implicits._ Support for serializing other types will be added in future releases.
- not enough arguments for method as: (implicit evidence$2:
org.apache.spark.sql.Encoder[DataSet.spark.aacsv])org.apache.spark.sql.Dataset[DataSet.spark.aacsv]. Unspecified value parameter evidence$2
如何解决此问题? 我的代码是-
How to resolve this?. My code is -
case class aaCSV(
a: String,
b: String
)
object WorkShop {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv")
df.printSchema()
df.show()
val googleDS = df.as[aaCSV]
googleDS.show()
}
}
现在我这样更改了主要功能-
Now I changed main function like this -
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
sa.printSchema()
sa.show()
}
但是它引发错误-线程"main"中的异常org.apache.spark.sql.AnalysisException:给定输入列,无法解析"Adj_Close
":[_c1,_c2,_c5,_c4,_c6,_c3 ,_c0];第1行pos 7 .我该怎么办?
But it throws error - Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'Adj_Close
' given input columns: [_c1, _c2, _c5, _c4, _c6, _c3, _c0]; line 1 pos 7. What should i do ?
现在,我使用Spark Scheduler根据给定的时间间隔执行我的方法.但我引用了此链接- https://spark .apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application .请帮助我们.
Now I execute my method using based on given time interval using spark scheduler. But I refer this link - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. Kindly help us.
推荐答案
csv文件中是否包含标题(列名称)?如果是,请尝试添加
读取语句中的.option("header","true")
.
例子:
sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV]
.
Do you have header (column names) in your csv files ? If yes, try adding
.option("header","true")
in the read statement.
Example:
sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV]
.
以下博客针对数据框和数据集提供了不同的示例: http ://technippet.blogspot.in/2016/10/different-ways-of-creating.html
The below blog has different examples for Dataframes and Dataset:http://technippet.blogspot.in/2016/10/different-ways-of-creating.html
这篇关于如何使用Scala在Spark中使用DataSet?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!