使用Spark无法将CSV数据正确加载为Parquet [英] Csv Data is not loading properly as Parquet using Spark
问题描述
我在配置单元
CREATE TABLE tab_data (
rec_id INT,
rec_name STRING,
rec_value DECIMAL(3,1),
rec_created TIMESTAMP
) STORED AS PARQUET;
,我想用这些 .csv 文件中的数据填充该表
and I want to populate this table with data in .csv files like these
10|customer1|10.0|2016-09-07 08:38:00.0
20|customer2|24.0|2016-09-08 10:45:00.0
30|customer3|35.0|2016-09-10 03:26:00.0
40|customer1|46.0|2016-09-11 08:38:00.0
50|customer2|55.0|2016-09-12 10:45:00.0
60|customer3|62.0|2016-09-13 03:26:00.0
70|customer1|72.0|2016-09-14 08:38:00.0
80|customer2|23.0|2016-09-15 10:45:00.0
90|customer3|30.0|2016-09-16 03:26:00.0
使用 Spark 和 Scala 并使用以下代码
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}
object MainApp {
val spark = SparkSession
.builder()
.appName("MainApp")
.master("local[*]")
.config("spark.sql.shuffle.partitions","200")
.getOrCreate()
val sc = spark.sparkContext
val inputPath = "hdfs://host.hdfs:8020/..../tab_data.csv"
val outputPath = "hdfs://host.hdfs:8020/...../warehouse/test.db/tab_data"
def main(args: Array[String]): Unit = {
try {
val DecimalType = DataTypes.createDecimalType(3, 1)
/**
* schema
*/
val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))
/**
* Reading the data from HDFS
*/
val data = spark
.read
.option("sep","|")
.schema(schema)
.csv(inputPath)
data.show(truncate = false)
data.schema.printTreeString()
/**
* Writing the data as Parquet
*/
data
.write
.mode(SaveMode.Append)
.parquet(outputPath)
} finally {
sc.stop()
spark.stop()
}
}
}
问题是我得到了这个输出
The problem is that I am getting this output
+------+--------+---------+-----------+
|rec_id|rec_name|rec_value|rec_created|
+------+--------+---------+-----------+
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
|null |null |null |null |
root
|-- rec_id: integer (nullable = true)
|-- rec_name: string (nullable = true)
|-- rec_value: decimal(3,1) (nullable = true)
|-- rec_created: timestamp (nullable = true)
架构很好,但是数据未正确加载到表中
The schema is fine but the data is not loading properly in the table
SELECT * FROM tab_data;
+------------------+--------------------+---------------------+-----------------------+--+
| tab_data.rec_id | tab_data.rec_name | tab_data.rec_value | tab_data.rec_created |
+------------------+--------------------+---------------------+-----------------------+--+
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
| NULL | NULL | NULL | NULL |
我在做什么错了?
我是 Spark 的新手,我们将不胜感激.
I'm new with Spark and some help would be appreciated.
推荐答案
由于所有类型 String
的列之一均无法获取所有列的 null
值转换为 Timestamp
类型.
You are getting null
values in all columns because one of the column of type String
is not able convert to Timestamp
type.
要将字符串转换为时间戳类型,请在加载csv时使用此 option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
选项指定时间戳格式数据.
To convert string to timestamp type, specify timestamp format by using this option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
option while loading csv data.
检查以下代码.
架构
scala> val schema = StructType(List(
StructField("rec_id", IntegerType, true),
StructField("rec_name",StringType, true),
StructField("rec_value",DecimalType(3,1)),
StructField("rec_created",TimestampType, true))
)
加载CSV数据
scala> val df = spark
.read
.option("sep","|")
.option("inferSchema","true")
.option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
.schema(schema)
.csv("/tmp/sample")
scala> df.show(false)
+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created |
+------+---------+---------+-------------------+
|10 |customer1|10.0 |2016-09-07 08:38:00|
|20 |customer2|24.0 |2016-09-08 10:45:00|
|30 |customer3|35.0 |2016-09-10 03:26:00|
|40 |customer1|46.0 |2016-09-11 08:38:00|
|50 |customer2|55.0 |2016-09-12 10:45:00|
|60 |customer3|62.0 |2016-09-13 03:26:00|
|70 |customer1|72.0 |2016-09-14 08:38:00|
|80 |customer2|23.0 |2016-09-15 10:45:00|
|90 |customer3|30.0 |2016-09-16 03:26:00|
+------+---------+---------+-------------------+
已更新
由于表是托管表,因此不需要设置所有这些参数,可以使用 insertInto
函数将数据插入表中.
Since table is managed table, You don't need to set all those parameters, You can use insertInto
function to insert the data into table.
df.write.mode("append").insertInto("tab_data")
这篇关于使用Spark无法将CSV数据正确加载为Parquet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!