Spark 窗口函数 - rangeBetween 日期 [英] Spark Window Functions - rangeBetween dates

查看:42
本文介绍了Spark 窗口函数 - rangeBetween 日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含数据的 Spark SQL DataFrame,我想要获取的是给定日期范围内当前行之前的所有行.因此,例如,我希望在给定行之前拥有 7 天前的所有行.我发现我需要使用 Window Function 像:

Window \.partitionBy('id') \.orderBy('开始')

问题来了.我想要一个 rangeBetween 7 天,但我在 Spark 文档中找不到任何关于此的内容.Spark 甚至提供这样的选项吗?现在我只是得到所有前面的行:

.rowsBetween(-sys.maxsize, 0)

但想要实现以下目标:

.rangeBetween("7 days", 0)

如果有人能帮助我解决这个问题,我将不胜感激.提前致谢!

解决方案

Spark >= 2.3

从 Spark 2.3 开始,可以使用 SQL API 使用间隔对象,但 DataFrame API 支持是 仍在进行中.

df.createOrReplaceTempView("df")火花.sql("""SELECT *, mean(some_value) OVER (按 ID 分区ORDER BY CAST(开始作为时间戳)前 7 天与当前行的间隔范围) 作为意思来自 df""").show()## +---+-----------+-----------+----------+## |身份证|开始|some_value|意思|## +---+-----------+-----------+----------+## |1|2015-01-01|20.0|20.0|## |1|2015-01-06|10.0|15.0|## |1|2015-01-07|25.0|18.3333333333333332|## |1|2015-01-12|30.0|21.6666666666666668|## |2|2015-01-01|5.0|5.0|## |2|2015-01-03|30.0|17.5|## |2|2015-02-01|20.0|20.0|## +---+-----------+-----------+----------+

火花<2.3

据我所知,在 Spark 和 Hive 中都不可能直接使用.两者都要求与 RANGE 一起使用的 ORDER BY 子句是数字.我发现的最接近的事情是转换为时间戳并以秒为单位进行操作.假设 start 列包含 date 类型:

from pyspark.sql import Rowrow = Row("id", "start", "some_value")df = sc.parallelize([行(1,2015-01-01",20.0),行(1,2015-01-06",10.0),行(1,2015-01-07",25.0),行(1,2015-01-12",30.0),行(2,2015-01-01",5.0),行(2,2015-01-03",30.0),行(2,2015-02-01",20.0)]).toDF().withColumn("start", col("start").cast("date"))

一个小助手和窗口定义:

from pyspark.sql.window import Windowfrom pyspark.sql.functions import mean, col# Hive 时间戳被解释为以秒为单位的 UNIX 时间戳*天数 = lambda i: i * 86400

最后查询:

w = (Window().partitionBy(col("id")).orderBy(col("start").cast("timestamp").cast("long")).rangeBetween(-days(7), 0))df.select(col("*"), mean("some_value").over(w).alias("mean")).show()## +---+-----------+-----------+----------+## |身份证|开始|some_value|意思|## +---+-----------+-----------+----------+## |1|2015-01-01|20.0|20.0|## |1|2015-01-06|10.0|15.0|## |1|2015-01-07|25.0|18.3333333333333332|## |1|2015-01-12|30.0|21.6666666666666668|## |2|2015-01-01|5.0|5.0|## |2|2015-01-03|30.0|17.5|## |2|2015-02-01|20.0|20.0|## +---+-----------+-----------+----------+

远非漂亮但有效.

<小时>

* Hive 语言手册,类型

I am having a Spark SQL DataFrame with data and what I'm trying to get is all the rows preceding current row in a given date range. So for example I want to have all the rows from 7 days back preceding given row. I figured out I need to use a Window Function like:

Window \
    .partitionBy('id') \
    .orderBy('start')

and here comes the problem. I want to have a rangeBetween 7 days, but there is nothing in the Spark docs I could find on this. Does Spark even provide such option? For now I'm just getting all the preceding rows with:

.rowsBetween(-sys.maxsize, 0)

but would like to achieve something like:

.rangeBetween("7 days", 0)

If anyone could help me on this one I'll be very grateful. Thanks in advance!

解决方案

Spark >= 2.3

Since Spark 2.3 it is possible to use interval objects using SQL API, but the DataFrame API support is still work in progress.

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Spark < 2.3

As far as I know it is not possible directly neither in Spark nor Hive. Both require ORDER BY clause used with RANGE to be numeric. The closest thing I found is conversion to timestamp and operating on seconds. Assuming start column contains date type:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

A small helper and window definition:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col


# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

Finally query:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Far from pretty but works.


* Hive Language Manual, Types

这篇关于Spark 窗口函数 - rangeBetween 日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆