Spark 2时间类型从CSV导致异常with take(5) [英] Spark 2 time type from CSV causing exception with take(5)

查看:839
本文介绍了Spark 2时间类型从CSV导致异常with take(5)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  A,我是火花和scala的新手,我尝试加载类似的csv: 09:33:57.570 
B,09:43:02.577
...

我在scala.sql.types中看到的唯一时间类型是TimestampType,所以我加载csv与:

  val schema = StructType(Array(StructField(A,StringType,true),StructField(time,TimestampType,true)))

val table = spark.read.option false)。option(inferSchema,false)。schema(schema).csv(../ table.csv)

这似乎工作很好,直到我 table.show() table.take 等,在这种情况下,我得到以下异常:

  scala> table.show()
16/10/07 16:32:25 ERROR Executor:在阶段1.0(TID 1)中的任务0.0中的异常
java.lang.IllegalArgumentException
at java.sql .Date.valueOf(Date.java:143)
在org.apache.spark.sql.catalyst.util.DateTimeUtils $ .stringToTime(DateTimeUtils.scala:137)
在org.apache.spark。 sql.execution.datasources.csv.CSVTypeCast $ .castTo(CSVInferSchema.scala:287)
在org.apache.spark.sql.execution.datasources.csv.CSVRelation $$ anonfun $ csvParser $ 3.apply(CSVRelation。 scala:115)
at org.apache.spark.sql.execution.datasources.csv.CSVRelation $$ anonfun $ csvParser $ 3.apply(CSVRelation.scala:84)
在org.apache.spark。 sql.execution.datasources.csv.CSVFileFormat $$ anonfun $ buildReader $ 1 $$ anonfun $ apply $ 1.apply(CSVFileFormat.scala:125)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat $$ anonfun $ buildReader $ 1 $$ anonfun $ apply $ 1.apply(CSVFileFormat.scala:124)
at scala.collection.Iterator $$ anon $ 12.nextCur(Iterator.scala:434)

有时间数据存储在spark中的首选方法吗?我也试过把它作为一个字符串和映射LocalTime.parse()从java.time每个值,但是失败,说没有编码器的类型。

解决方案

没有可以直接容纳时间数据的SQL类型,所以可能最好的做法是使用 LongType unix_timestamp 。读取数据

  StructField(time,StringType,true)))

它应该产生类似于以下的数据框:

  val df = Seq((A,09:33:57.570),(B,09:43:02.577))toDF(A,time)

定义一个简单的日期格式:

  val format =HH:mm:ss.SSS

parsing:

  df.withColumn(seconds,unix_timestamp($time,format))

不幸的是,这是一种有损的转换。

  + --- + ------------ + ------- + 
| A |时间|秒|
+ --- + ------------ + ------- +
| A | 09:33:57.570 | 30837 |
| B | 09:43:02.577 | 31382 |
+ --- + ------------ + ------- +

所以如果你想保留毫秒,你可以使用 java.time.LocalTime ,并存储结果 toNanoOfDay

  val nanoOfDay = udf((s:String)=> 
java.time.LocalTime.parse(s).toNanoOfDay)

df.withColumn(nanseconds,nanoOfDay($time))
/ pre>

I'm very new to both spark and scala, and am trying to load a csv similar to:

A,09:33:57.570
B,09:43:02.577
...

The only temporal type I see in scala.sql.types is TimestampType, so I am loading the csv with:

val schema = StructType(Array( StructField("A", StringType, true), StructField("time", TimestampType, true)))

val table = spark.read.option("header","false").option("inferSchema","false").schema(schema).csv("../table.csv")

This seems to work fine until I do table.show() or table.take(5), etc, in which case I get the following exception:

scala> table.show()
16/10/07 16:32:25 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException
        at java.sql.Date.valueOf(Date.java:143)
        at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
        at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:287)
        at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:115)
        at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:84)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:125)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:124)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

Is there a preferred way of having time data stored within spark? I have also tried leaving it as a string and mapping LocalTime.parse() from java.time on each value, but that fails saying that there is no Encoder for the type.

解决方案

There is no SQL type which can directly accommodate time data so probably the best you can do is to use LongType by parsing with unix_timestamp. Read data with

 StructField("time", StringType, true)))

It should result with a data frame similar to:

val df = Seq(("A", "09:33:57.570"), ("B", "09:43:02.577")).toDF("A", "time")

define a simple date format:

val format = "HH:mm:ss.SSS"

and use it for parsing:

df.withColumn("seconds", unix_timestamp($"time", format))

Unfortunately this is a lossy transformation.

+---+------------+-------+
|  A|        time|seconds|
+---+------------+-------+
|  A|09:33:57.570|  30837|
|  B|09:43:02.577|  31382|
+---+------------+-------+

so if you want to preserve milliseconds you can use java.time.LocalTime as you do and store the result of toNanoOfDay.

val nanoOfDay = udf((s: String) => 
  java.time.LocalTime.parse(s).toNanoOfDay)

df.withColumn("nanseconds", nanoOfDay($"time"))

这篇关于Spark 2时间类型从CSV导致异常with take(5)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆