Spark - 将CSV文件加载为DataFrame? [英] Spark - load CSV file as DataFrame?
问题描述
我想在spark中读取一个CSV并将其转换为DataFrame并使用 df.registerTempTable(table_name)
I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
我得到的错误:
Error which I got:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在Apache Spark中将CSV文件加载为DataFrame的正确命令是什么?
What is the right command to load CSV file as DataFrame in Apache Spark?
推荐答案
Spark SQL仅为3种类型的数据源提供内置支持:
$ b
Spark SQL provides inbuilt support for only 3 types of data sources:
- 实木复合地板(这是默认设置)
- Json
- Jdbc
对于CSV,有一个单独的库:< a href =https://github.com/databricks/spark-csv =no引用者> spark-csv
For CSV, there is a separate library: spark-csv
它是 CsvContext
class provides csvFile
可用于加载csv的方法。
It's CsvContext
class provides csvFile
method which can be used to load csv.
val cars = sqlContext.csvFile("cars.csv") // uses implicit class CsvContext
编辑:从Spark版本2.0开始,spark-csv是Spark核心功能的一部分,不需要单独的库。
所以你可以做例子
As of Spark version 2.0 and up, spark-csv is part of core Spark functionality and doesn't require a separate library. So you could just do for example
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
这篇关于Spark - 将CSV文件加载为DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!