Spark SQL - 如何从纪元中选择存储为 UTC 毫秒的日期? [英] Spark SQL - How to select on dates stored as UTC millis from the epoch?

查看:30
本文介绍了Spark SQL - 如何从纪元中选择存储为 UTC 毫秒的日期?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在搜索,但没有找到关于如何使用 Spark SQL 查询以 UTC 毫秒为单位存储的日期的解决方案.我从 NoSQL 数据源(来自 MongoDB 的 JSON)中提取的模式的目标日期为:

I have been searching and have not found a solution as to how one might query on dates stored as UTC milliseconds from the epoch using Spark SQL. The schema I have pulled in from a NoSQL datasource (JSON from MongoDB) has the target date as:

|-- dateCreated: struct (nullable = true)

||-- $date: long (nullable = true)

完整的架构如下:

scala> accEvt.printSchema
root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- appId: integer (nullable = true)
 |-- cId: long (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- expires: struct (nullable = true)
 |    |    |-- $date: long (nullable = true)
 |    |-- metadata: struct (nullable = true)
 |    |    |-- another key: string (nullable = true)
 |    |    |-- class: string (nullable = true)
 |    |    |-- field: string (nullable = true)
 |    |    |-- flavors: string (nullable = true)
 |    |    |-- foo: string (nullable = true)
 |    |    |-- location1: string (nullable = true)
 |    |    |-- location2: string (nullable = true)
 |    |    |-- test: string (nullable = true)
 |    |    |-- testKey: string (nullable = true)
 |    |    |-- testKey2: string (nullable = true)
 |-- dateCreated: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- originationDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- processedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- receivedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)

我的目标是按照以下方式编写查询:

and my goal is to write queries along the lines of:

SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1]

到目前为止,我的流程是:

My process thus far has been:

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@29200d25

scala> val accEvt = sqlContext.jsonFile("/home/bkarels/mongoexport/accomplishment_event.json")

...
14/10/29 15:03:38 INFO SparkContext: Job finished: reduce at JsonRDD.scala:46, took 4.668981083 s
accEvt: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:103

scala> accEvt.registerAsTable("accomplishmentEvent")

(此时下面的基线查询执行成功)

scala> sqlContext.sql("select count(*) from accomplishmentEvent").collect.foreach(println)
...
[74475]

现在,我无法理解的巫术是如何形成我的选择语句来推理日期.例如,以下执行没有错误,但返回零而不是所有记录的计数(74475).

Now, the voodoo that I cannot get right is how to form my select statement to reason about the dates. For example the following executes w/o error, but returns zero rather than the count of all records as it should (74475).

scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate >= '1970-01-01'").collect.foreach(println)
...
[0]

我也尝试过一些丑陋的东西,比如:

I have also tried some ugliness like:

scala> val now = new java.util.Date()
now: java.util.Date = Wed Oct 29 15:05:15 CDT 2014

scala> val today = now.getTime
today: Long = 1414613115743

scala> val thirtydaysago = today - (30 * 24 * 60 * 60 * 1000)
thirtydaysago: Long = 1416316083039


scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate <= %s and processedDate >= %s".format(today,thirtydaysago)).collect.foreach(println)

按照建议,我选择了一个命名字段以确保其有效.所以:

As recommended, I've selected on a named field to ensure that works. So:

scala> sqlContext.sql("select receivedDate from accomplishmentEvent limit 10").collect.foreach(println)

返回:

[[1376318850033]]
[[1376319429590]]
[[1376320804289]]
[[1376320832835]]
[[1376320832960]]
[[1376320835554]]
[[1376320914480]]
[[1376321041899]]
[[1376321109341]]
[[1376321121469]]

然后扩展以尝试获得某种我尝试过的日期:

Then extending to try and get some kind of dates working I have tried:

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.date > '1970-01-01' limit 5").collect.foreach(println)

导致错误:

java.lang.RuntimeException: No such field date in StructType(ArrayBuffer(StructField($date,LongType,true)))
...

使用 $ 作为我们的字段名称的前缀也会导致不同类型的错误:

Prefixing our field name with $ as also suggested results in a different kind of error:

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5").collect.foreach(println)
java.lang.RuntimeException: [1.69] failure: ``UNION'' expected but ErrorToken(illegal character) found

select actualConsumerId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5

显然我不知道如何选择以这种方式存储的日期 - 谁能帮我填补这个空白?

Clearly I am not getting how to select on dates stored in this way - can anyone help me fill in this gap?

我对 Scala 和 Spark 都不熟悉,所以如果这是一个基本问题,请原谅我,但我在论坛和 Spark 文档中的搜索结果为空.

I am newer to both Scala and Spark, so forgive me if this is an elementary question, but my searches have turned up empty on the forums and Spark documentation.

谢谢.

推荐答案

你的 JSON 不是平面的,所以顶层以下的字段需要使用限定名称来寻址,例如 dateCreated.$date.您的特定日期字段都是 long 类型,因此您需要对它们进行数值比较,看起来您的做法是正确的.

Your JSON is not flat, so the fields below the top level need to be addressed using qualified names, such as dateCreated.$date. Your specific date fields are all of long type, so you'll need to do numerical comparisons on them and it looks like you were on the right track for doing those.

另一个问题是您的字段名称有$"字符,Spark SQL 不允许您查询它们.一种解决方案是,不是直接将 JSON 读取为 SchemaRDD(如您所做的那样),而是首先将其作为 RDD[String] 读取,而是使用 map 方法执行您选择的 Scala 字符串操作,然后使用 SQLContextjsonRDD 方法创建 SchemaRDD.

An additional problem is that your field names have "$" characters, and Spark SQL won't let you query on them. One solution is that instead of reading the JSON directly as a SchemaRDD (as you have done) you first read it as an RDD[String], use the map method to perform the Scala string manipulations of your choice, and then use SQLContext's jsonRDD method to create the SchemaRDD.

val lines = sc.textFile(...)
// you may want something less naive than global replacement of all "$" chars
val linesFixed = lines.map(s => s.replaceAllLiterally("$", ""))
val accEvt = sqlContext.jsonRDD(linesFixed)

我已经用 Spark 1.1.0 对此进行了测试.

I've tested this with Spark 1.1.0.

作为参考,此错误报告中已指出 Spark SQL 中缺乏引用功能 或许还有其他人,似乎该修复程序最近签入,但需要一些时间才能发布

For reference, the lack of quoting capability in Spark SQL has been noted in this bug report and perhaps others, and it seems that the fix was recently checked in, but will take some time to make it into a release

这篇关于Spark SQL - 如何从纪元中选择存储为 UTC 毫秒的日期?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆