如何以自定义格式加载带有时间戳的CSV? [英] How to load CSVs with timestamps in custom format?

查看:78
本文介绍了如何以自定义格式加载带有时间戳的CSV?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在csv文件中有一个timestamp字段,该字段使用spark csv库加载到数据框.同一段代码可在我的本地计算机上使用Spark 2.0版本运行,但在Azure Hortonworks HDP 3.5和3.6上引发错误.

I have a timestamp field in a csv file that I load to a dataframe using spark csv library. The same piece of code works on my local machine with Spark 2.0 version but throws an error on Azure Hortonworks HDP 3.5 and 3.6.

我已经检查过,Azure HDInsight 3.5也使用相同的Spark版本,因此我认为Spark版本没有问题.

I have checked and Azure HDInsight 3.5 is also using the same Spark version so I don't think it's a problem with Spark version.

import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
  .add("EventDate",DataTypes.TimestampType)
  .add("Name",DataTypes.StringType)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)

整个异常如下:

Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
  at java.sql.Timestamp.valueOf(Timestamp.java:237)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at scala.util.Try.getOrElse(Try.scala:79)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
  ... 27 more

csv文件只有一行,如下所示:

The csv file has only one row as follows:

"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

推荐答案

TL; DR 使用timestampFormat选项(不是dateFormat).

我设法在最新的Spark版本 2.3.0-SNAPSHOT (由母版构建)中进行复制.

I've managed to reproduce it in the latest Spark version 2.3.0-SNAPSHOT (built from the master).

// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT

case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema

scala> spark
  .read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .schema(schema)
  .load("so-43259485.csv")
  .show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql.Timestamp.valueOf(Timestamp.java:237)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at scala.util.Try.getOrElse(Try.scala:79)

The corresponding line in the Spark sources is the "root cause" of the issue:

Timestamp.valueOf(s)

已阅读

Having read the javadoc of Timestamp.valueOf, you can learn that the argument should be:

时间戳,格式为yyyy-[m]m-[d]d hh:mm:ss[.f...].小数秒可以省略. mm和dd的前导零也可以省略.

timestamp in format yyyy-[m]m-[d]d hh:mm:ss[.f...]. The fractional seconds may be omitted. The leading zero for mm and dd may also be omitted.

请注意小数秒可以省略",因此,我们首先将EventDate加载为字符串,然后将其删除,然后再将其转换为时间戳,以将其切断.

Note "The fractional seconds may be omitted" so let's cut it off by first loading the EventDate as a String and only after removing the unneeded fractional seconds convert it to Timestamp.

val eventsAsString = spark.read.format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .load("so-43259485.csv")

事实证明使用timestampFormat选项首先,如果已定义且仅当未使用时使用

It turns out that for fields of TimestampType type Spark uses timestampFormat option first if defined and only if not uses the code the uses Timestamp.valueOf.

事实证明,解决方法只是使用timestampFormat选项(而不是dateFormat!).

It turns out the fix is just to use timestampFormat option (not dateFormat!).

val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate              |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+

Spark 2.1.0

通过自定义timestampFormatinferSchema选项在CSV中使用模式推断.

Spark 2.1.0

Use schema inference in CSV using inferSchema option with your custom timestampFormat.

使用inferSchema触发模式推断对于使timestampFormat生效很重要.

It's important to trigger schema inference using inferSchema for timestampFormat to take effect.

val events = spark.read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .option("inferSchema", true)
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .load("so-43259485.csv")

scala> events.show(false)
+-------------------+----+
|EventDate          |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)


留有不正确"的初始版本用于学习目的

val events = eventsAsString
  .withColumn("date", split($"EventDate", " ")(0))
  .withColumn("date", translate($"date", "/", "-"))
  .withColumn("time", split($"EventDate", " ")(1))
  .withColumn("time", split($"time", "[.]")(0))    // <-- remove millis part
  .withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
  .select($"EventDate" cast "timestamp", $"Name")

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)
    events.show(false)

scala> events.show
+-------------------+----+
|          EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

Spark 2.2.0

从Spark 2.2开始,您可以使用to_timestamp函数进行字符串到时间戳的转换.

Spark 2.2.0

As of Spark 2.2 you can use to_timestamp function to do the string to timestamp conversion.

eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)

scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate              |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27                                 |
+-----------------------+----------------------------------------------------+

这篇关于如何以自定义格式加载带有时间戳的CSV?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆