将CSV读取到带有时间戳和日期类型的Spark Dataframe中 [英] Reading CSV into a Spark Dataframe with timestamp and date types

查看:178
本文介绍了将CSV读取到带有时间戳和日期类型的Spark Dataframe中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

它是带有Spark 1.6 的CDH.

It's CDH with Spark 1.6.

我正在尝试将此假设CSV导入到Apache Spark DataFrame中:

I am trying to import this Hypothetical CSV into a apache Spark DataFrame:

$ hadoop fs -cat test.csv
a,b,c,2016-09-09,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10,a,2016-11-11 09:09:10.0,a

我使用 databricks-csv jar.

val textData = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")

我使用inferSchema为所得的DataFrame制作架构. printSchema()函数为上述代码提供了以下输出:

I use inferSchema to make the schema for the resulting DataFrame. printSchema() function gives me the following output for the code above:

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+

C3列的类型为 String .我希望C3具有日期类型.为了使它保持最新状态,我尝试了以下代码.

The C3 column has String type. I want C3 to have date type. To get it to date type I tried the following code.

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")

scala> textData.printSchema
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: timestamp (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+--------------------+---+--------------------+---+
| C0| C1| C2|                  C3| C4|                  C5| C6|
+---+---+---+--------------------+---+--------------------+---+
|  a|  b|  c|2016-09-09 00:00:...|  a|2016-11-11 00:00:...|  a|
|  a|  b|  c|2016-09-10 00:00:...|  a|2016-11-11 00:00:...|  a|
+---+---+---+--------------------+---+--------------------+---+

此代码与第一个代码块之间的唯一区别是 dateFormat 选项行(我使用"yyyy-MM-dd" 而不是"yyyy- MM-dd HH:mm:ss").现在,我同时获得C3和C5作为时间戳(C3仍然不是日期).但是对于C5,HH :: mm:ss部分将被忽略,并在数据中显示为零.

The only difference between this code and the first block is the dateFormat option line (I use "yyyy-MM-dd" instead of "yyyy-MM-dd HH:mm:ss").Now I get both C3 and C5 as timestamps(C3 is still not date). But for C5, the HH::mm:ss part is ignored and shows up as zeroes in the data.

理想情况下,我希望C3的类型为date,C5的类型为timestamp,其HH:mm:ss部分不容忽视.我的解决方案现在看起来像这样.我通过从数据库并行提取数据来制作csv.我确保将所有日期都作为时间戳记(不理想).因此,测试csv现在看起来像这样:

Ideally I want C3 to be of type date, C5 to be of type timestamp and its HH:mm:ss part to be not ignored. My solution right now looks like this. I make the csv by pulling data in parallel from my DB. I make sure that I pull all dates as timestamps (Not ideal). So, the test csv looks like this now:

$ hadoop fs -cat new-test.csv
a,b,c,2016-09-09 00:00:00,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10 00:00:00,a,2016-11-11 09:09:10.0,a

这是我最终的工作代码:

This is my final working code:

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .schema(finalSchema)
    .option("nullValue", "null")
    .load("new-test.csv")

在这里,我在dateFormat中使用完整的时间戳格式("yyyy-MM-dd HH:mm:ss" ).我手动创建了finalSchema实例,其中c3是日期,C5是时间戳类型(Spark sql类型).我使用schema()函数应用这些架构.输出如下:

Here, I use the complete timestamp format ("yyyy-MM-dd HH:mm:ss") in dateFormat. I manually create the finalSchema instance where c3 is date and C5 is Timestamp type(Spark sql types). I apply these schema use the schema() function. The output looks like follows:

scala> finalSchema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(C0,StringType,true), StructField(C1,StringType,true), StructField(C2,StringType,true), StructField(C3,DateType,true), StructField(C4,StringType,true), StructField(C5,TimestampType,true), StructField(C6,StringType,true))

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: date (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)


scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+

是否有更简便的方法来解析csv文件(具有日期和时间戳类型的csv文件到spark数据框中?

相关链接:
http://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options
https://github.com/databricks/spark-csv

Relevant Links:
http://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options
https://github.com/databricks/spark-csv

推荐答案

对于非平凡的情况,如果使用推断选项,则可能不会返回预期的结果.您可以在 InferSchema.scala :

With a infer option for non-trivial cases it will probably not return the expected result. As you can see in InferSchema.scala:

if (field == null || field.isEmpty || field == nullValue) {
  typeSoFar
} else {
  typeSoFar match {
    case NullType => tryParseInteger(field)
    case IntegerType => tryParseInteger(field)
    case LongType => tryParseLong(field)
    case DoubleType => tryParseDouble(field)
    case TimestampType => tryParseTimestamp(field)
    case BooleanType => tryParseBoolean(field)
    case StringType => StringType
    case other: DataType =>
      throw new UnsupportedOperationException(s"Unexpected data type $other")

它将仅尝试将每个列与时间戳类型匹配,而不与日期类型匹配,因此在这种情况下,开箱即用的解决方案"是不可能的.但是根据我的经验,简便"的解决方案是直接使用

It will only try to match each column with a timestamp type, not a date type, so the "out of the box solution" for this case is not possible. But with my experience the "easier" solution, is directly define the schema with the needed type, it will avoid the infer option set a type that only matches for the RDD evaluated not the entire data. Your final schema is an efficient solution.

这篇关于将CSV读取到带有时间戳和日期类型的Spark Dataframe中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆