如何以自定义格式加载带有时间戳的 CSV? [英] How to load CSVs with timestamps in custom format?
问题描述
我在使用 spark csv 库加载到数据帧的 csv 文件中有一个时间戳字段.同一段代码在我的本地机器上使用 Spark 2.0 版本运行,但在 Azure Hortonworks HDP 3.5 和 3.6 上引发错误.
I have a timestamp field in a csv file that I load to a dataframe using spark csv library. The same piece of code works on my local machine with Spark 2.0 version but throws an error on Azure Hortonworks HDP 3.5 and 3.6.
我已经检查过,Azure HDInsight 3.5 也在使用相同的 Spark 版本,所以我认为这不是 Spark 版本的问题.
I have checked and Azure HDInsight 3.5 is also using the same Spark version so I don't think it's a problem with Spark version.
import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
.add("EventDate",DataTypes.TimestampType)
.add("Name",DataTypes.StringType)
val df = spark.read
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","|")
.option("mode","FAILFAST")
.option("inferSchema","false")
.option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
.schema(sourceSchemaStruct)
.load(sourceFile)
整个异常如下:
Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:237)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
... 27 more
csv 文件只有一行如下:
The csv file has only one row as follows:
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"
推荐答案
TL;DR 使用 timestampFormat
选项(不是 dateFormat
).
TL;DR Use timestampFormat
option (not dateFormat
).
我已经成功地在最新的 Spark 版本 2.3.0-SNAPSHOT(由 master 构建)中重现了它.
I've managed to reproduce it in the latest Spark version 2.3.0-SNAPSHOT (built from the master).
// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"
// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT
case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema
scala> spark
.read
.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.schema(schema)
.load("so-43259485.csv")
.show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:237)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
at scala.util.Try.getOrElse(Try.scala:79)
Spark 源代码中的相应行 是问题的根本原因":
The corresponding line in the Spark sources is the "root cause" of the issue:
Timestamp.valueOf(s)
已阅读Timestamp.valueOf的javadoc,可以了解到参数应该是:
Having read the javadoc of Timestamp.valueOf, you can learn that the argument should be:
时间戳格式 yyyy-[m]m-[d]d hh:mm:ss[.f...]
.可以省略小数秒.mm 和 dd 的前导零也可以省略.
timestamp in format
yyyy-[m]m-[d]d hh:mm:ss[.f...]
. The fractional seconds may be omitted. The leading zero for mm and dd may also be omitted.
注意可以省略小数秒",所以让我们首先将 EventDate 作为字符串加载,然后在删除不需要的小数秒后将其转换为时间戳.
Note "The fractional seconds may be omitted" so let's cut it off by first loading the EventDate as a String and only after removing the unneeded fractional seconds convert it to Timestamp.
val eventsAsString = spark.read.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.load("so-43259485.csv")
结果是 用于TimestampType
类型 Spark 使用timestampFormat
选项 如果已定义,则首先使用 使用的代码Timestamp.valueOf
.
It turns out that for fields of TimestampType
type Spark uses timestampFormat
option first if defined and only if not uses the code the uses Timestamp.valueOf
.
事实证明,修复只是使用 timestampFormat
选项(而不是 dateFormat
!).
It turns out the fix is just to use timestampFormat
option (not dateFormat
!).
val df = spark.read
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","|")
.option("mode","FAILFAST")
.option("inferSchema","false")
.option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
.schema(sourceSchemaStruct)
.load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+
火花 2.1.0
使用带有自定义 timestampFormat
的 inferSchema
选项在 CSV 中使用模式推断.
Spark 2.1.0
Use schema inference in CSV using inferSchema
option with your custom timestampFormat
.
使用 inferSchema
触发模式推断以使 timestampFormat
生效很重要.
It's important to trigger schema inference using inferSchema
for timestampFormat
to take effect.
val events = spark.read
.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.option("inferSchema", true)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.load("so-43259485.csv")
scala> events.show(false)
+-------------------+----+
|EventDate |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+
scala> events.printSchema
root
|-- EventDate: timestamp (nullable = true)
|-- Name: string (nullable = true)
<小时>
不正确"的初始版本仅供学习使用
val events = eventsAsString
.withColumn("date", split($"EventDate", " ")(0))
.withColumn("date", translate($"date", "/", "-"))
.withColumn("time", split($"EventDate", " ")(1))
.withColumn("time", split($"time", "[.]")(0)) // <-- remove millis part
.withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
.select($"EventDate" cast "timestamp", $"Name")
scala> events.printSchema
root
|-- EventDate: timestamp (nullable = true)
|-- Name: string (nullable = true)
events.show(false)
scala> events.show
+-------------------+----+
| EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+
火花 2.2.0
从 Spark 2.2 开始,您可以使用 to_timestamp
函数进行字符串到时间戳的转换.
Spark 2.2.0
As of Spark 2.2 you can use to_timestamp
function to do the string to timestamp conversion.
eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27 |
+-----------------------+----------------------------------------------------+
这篇关于如何以自定义格式加载带有时间戳的 CSV?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!