星火SQL - 如何在存储从开天辟地UTC日期米利斯选择? [英] Spark SQL - How to select on dates stored as UTC millis from the epoch?

查看:170
本文介绍了星火SQL - 如何在存储从开天辟地UTC日期米利斯选择?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在寻找,也没有找到一个解决方案,一个可以如何存储与使用SQL星火从开天辟地UTC毫秒日期查询。我从一个数据源的NoSQL(JSON从MongoDB的)拉到架构有目标日期为:

I have been searching and have not found a solution as to how one might query on dates stored as UTC milliseconds from the epoch using Spark SQL. The schema I have pulled in from a NoSQL datasource (JSON from MongoDB) has the target date as:

|-- dateCreated: struct (nullable = true)

||-- $date: long (nullable = true)

完整的架构如下:

The complete schema is as follows:

scala> accEvt.printSchema
root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- appId: integer (nullable = true)
 |-- cId: long (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- expires: struct (nullable = true)
 |    |    |-- $date: long (nullable = true)
 |    |-- metadata: struct (nullable = true)
 |    |    |-- another key: string (nullable = true)
 |    |    |-- class: string (nullable = true)
 |    |    |-- field: string (nullable = true)
 |    |    |-- flavors: string (nullable = true)
 |    |    |-- foo: string (nullable = true)
 |    |    |-- location1: string (nullable = true)
 |    |    |-- location2: string (nullable = true)
 |    |    |-- test: string (nullable = true)
 |    |    |-- testKey: string (nullable = true)
 |    |    |-- testKey2: string (nullable = true)
 |-- dateCreated: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- originationDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- processedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- receivedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)

和我的目标是编写线沿线的疑问:

and my goal is to write queries along the lines of:

SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1]

我的过程迄今已

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@29200d25

scala> val accEvt = sqlContext.jsonFile("/home/bkarels/mongoexport/accomplishment_event.json")

...
14/10/29 15:03:38 INFO SparkContext: Job finished: reduce at JsonRDD.scala:46, took 4.668981083 s
accEvt: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:103

scala> accEvt.registerAsTable("accomplishmentEvent")

(此时下面的基线查询成功执行)

scala> sqlContext.sql("select count(*) from accomplishmentEvent").collect.foreach(println)
...
[74475]

现在,我不能得到正确的巫术是如何形成我的select语句推理的日期。例如,下面的执行W / O错误,但返回零,而不是所有记录的计数,因为它应该(74475)。

Now, the voodoo that I cannot get right is how to form my select statement to reason about the dates. For example the following executes w/o error, but returns zero rather than the count of all records as it should (74475).

scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate >= '1970-01-01'").collect.foreach(println)
...
[0]

我也尝试了一些丑陋这样的:

I have also tried some ugliness like:

scala> val now = new java.util.Date()
now: java.util.Date = Wed Oct 29 15:05:15 CDT 2014

scala> val today = now.getTime
today: Long = 1414613115743

scala> val thirtydaysago = today - (30 * 24 * 60 * 60 * 1000)
thirtydaysago: Long = 1416316083039


scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate <= %s and processedDate >= %s".format(today,thirtydaysago)).collect.foreach(println)

根据建议,我已经在一个名为字段中选择,以保证工作。所以:

As recommended, I've selected on a named field to ensure that works. So:

scala> sqlContext.sql("select receivedDate from accomplishmentEvent limit 10").collect.foreach(println)

返回:

[[1376318850033]]
[[1376319429590]]
[[1376320804289]]
[[1376320832835]]
[[1376320832960]]
[[1376320835554]]
[[1376320914480]]
[[1376321041899]]
[[1376321109341]]
[[1376321121469]]

然后延伸到尝试,并获得某种日期的工作,我曾尝试:

Then extending to try and get some kind of dates working I have tried:

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.date > '1970-01-01' limit 5").collect.foreach(println)

结果错误:

java.lang.RuntimeException: No such field date in StructType(ArrayBuffer(StructField($date,LongType,true)))
...

prefixing我们的领域名字以 $ 如还建议在不同类型的错误的结果:

Prefixing our field name with $ as also suggested results in a different kind of error:

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5").collect.foreach(println)
java.lang.RuntimeException: [1.69] failure: ``UNION'' expected but ErrorToken(illegal character) found

select actualConsumerId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5

显然,我没有得到如何存储在这种方式中选择日期 - 谁能帮助我填补这一空白。

Clearly I am not getting how to select on dates stored in this way - can anyone help me fill in this gap?

我是新来既Scala和星火,所以请原谅我,如果这是一个基本的问题,但我的搜索已经转向了空的论坛和星火文档。

I am newer to both Scala and Spark, so forgive me if this is an elementary question, but my searches have turned up empty on the forums and Spark documentation.

感谢您。

推荐答案

您JSON是不平坦的,所以下面的顶层领域需要使用限定的名称,如 dateCreated会得到解决。$日期。你的具体日期字段都为类型,所以你需要做这些数值的比较,它看起来像你是在正确的轨道做那些。

Your JSON is not flat, so the fields below the top level need to be addressed using qualified names, such as dateCreated.$date. Your specific date fields are all of long type, so you'll need to do numerical comparisons on them and it looks like you were on the right track for doing those.

另外一个问题是,你的字段名有$字符,星火SQL不会让你对他们的查询。一个解决方案是,与其直接读取JSON作为 SchemaRDD (因为你已经做),你第一次读它作为一个 RDD [字符串] ,使用地图方法来执行你所选择的斯卡拉字符串操作,然后使用 SQLContext jsonRDD 方法来创建 SchemaRDD

An additional problem is that your field names have "$" characters, and Spark SQL won't let you query on them. One solution is that instead of reading the JSON directly as a SchemaRDD (as you have done) you first read it as an RDD[String], use the map method to perform the Scala string manipulations of your choice, and then use SQLContext's jsonRDD method to create the SchemaRDD.

val lines = sc.textFile(...)
// you may want something less naive than global replacement of all "$" chars
val linesFixed = lines.map(s => s.replaceAllLiterally("$", ""))
val accEvt = sqlContext.jsonRDD(linesFixed)

我和星火1.1.0测试这一点。

I've tested this with Spark 1.1.0.

有关参考,缺少引述星火SQL能力已这个bug报告指出或许其他人,似乎是修复最近检查,但需要一定的时间,使之成为一个发布

For reference, the lack of quoting capability in Spark SQL has been noted in this bug report and perhaps others, and it seems that the fix was recently checked in, but will take some time to make it into a release

这篇关于星火SQL - 如何在存储从开天辟地UTC日期米利斯选择?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆