'pyspark.sql.functions.window' 函数的 'startTime' 参数有什么作用? [英] What does the 'pyspark.sql.functions.window' function's 'startTime' argument do?

查看:18
本文介绍了'pyspark.sql.functions.window' 函数的 'startTime' 参数有什么作用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在官方文档中有一个简单的例子:

In the offcial doc there is just a simple example:

startTime 是相对于 1970-01-01 00:00:00 UTC 的偏移量从哪个开始窗口间隔.例如,为了让每小时滚动的窗口开始 15 分钟过了一个小时,例如12:15-13:15, 13:15-14:15... 提供 startTime 作为 15 分钟.

The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes.

但我想知道它是如何处理所有参数的.

But I want to know how does it works with all arguments.

例如:

ts_list = map(lambda x: datetime.datetime(2017, 1, 9, 9, 0, 10) + datetime.timedelta(seconds=x), range(30))
rdd = spark.sparkContext.parallelize(ts_list).map(lambda x: (x, 1))
df = spark.createDataFrame(rdd, schema=['dt', 'val'])
win = df.groupBy(window("dt", "5 seconds", '4 seconds', '3 seconds')).agg(sum("val").alias("sum"))
pprint.pprint(win.select(win['window']['start'].cast('string').alias('start'),
                         win['window']['end'].cast('string').alias('end')).collect())

输出:

[Row(start=u'2017-01-09 09:00:19', end=u'2017-01-09 09:00:24'),                 
 Row(start=u'2017-01-09 09:00:35', end=u'2017-01-09 09:00:40'),
 Row(start=u'2017-01-09 09:00:27', end=u'2017-01-09 09:00:32'),
 Row(start=u'2017-01-09 09:00:07', end=u'2017-01-09 09:00:12'),
 Row(start=u'2017-01-09 09:00:31', end=u'2017-01-09 09:00:36'),
 Row(start=u'2017-01-09 09:00:39', end=u'2017-01-09 09:00:44'),
 Row(start=u'2017-01-09 09:00:11', end=u'2017-01-09 09:00:16'),
 Row(start=u'2017-01-09 09:00:23', end=u'2017-01-09 09:00:28'),
 Row(start=u'2017-01-09 09:00:15', end=u'2017-01-09 09:00:20')]

那为什么?

推荐答案

这与您的数据何时开始无关.当然,只有在该窗口中有一些数据时才会出现第一个窗口.但是 startTime 与您的数据无关.正如文档所说, startTime 是相对于 1970-01-01 19:00:00 UTC 的偏移量,用于开始窗口间隔.如果你创建一个这样的窗口:
w = F.window("date_field", "7 days", startTime='6 days')

It has nothing to do with when your data start. Of course the first window will appear only until you have some data in that window. But the startTime has nothing to do with your data. As documentaiton says, the startTime is the offset with respect to 1970-01-01 19:00:00 UTC with which to start window intervals. if you create a window like this:
w = F.window("date_field", "7 days", startTime='6 days')

spark 将生成从 1970-01-06 开始的 7 天的窗口:

spark will generate the windows of 7 days starting from 1970-01-06:

1970-01-06 19:00:00、1970-01-13 19:00:00
1970-01-13 19:00:00, 1970-01-20 19:00:00
1970-01-20 19:00:00, 1970-01-27 19:00:00
...
2017-05-16 19:00:00, 2017-05-23 19:00:00
(如果你继续计算,你会得到这个日期)...
但是您只会看到与数据框日期相关的窗口.19:00:00 是因为我的时区是 -05
如果你创建一个这样的窗口:

1970-01-06 19:00:00, 1970-01-13 19:00:00
1970-01-13 19:00:00, 1970-01-20 19:00:00
1970-01-20 19:00:00, 1970-01-27 19:00:00
...
2017-05-16 19:00:00, 2017-05-23 19:00:00
(if you continue calculating you get to this date) ...
But you only will see the windows that are related to the dates of your dataframe. The 19:00:00 is because my timezone which is -05
if you create a window like this:

w = F.window("date_field", "7 days", startTime='2 days')

w = F.window("date_field", "7 days", startTime='2 days')

spark 将生成从 1970-01-02 开始的 7 天的窗口:

spark will generate the windows of 7 days starting from 1970-01-02:

1970-01-02 19:00:00、1970-01-09 19:00:00
1970-01-09 19:00:00, 1970-01-16 19:00:00
...
2017-05-19 19:00:00, 2017-05-26 19:00:00
(如果你继续计算,你会得到这个日期)
...

同样,您只会看到与数据框日期相关的窗口.

那么,如何计算数据窗口的开始日期?
您只需要计算自 1970-01-01 以来开始日期的天数,然后将其除以您的窗口长度并取余数.这将是偏移天数的开始时间.


我将用一个例子来解释它:假设您需要您的窗口从 2017-05-21 开始,并且 窗口的长度是 7 天.我将为示例创建一个虚拟数据框.

1970-01-02 19:00:00, 1970-01-09 19:00:00
1970-01-09 19:00:00, 1970-01-16 19:00:00
...
2017-05-19 19:00:00, 2017-05-26 19:00:00
(if you continue calculating you get to this date)
...

Again you only will see the windows that are related to the dates of your dataframe.

So, how can you calculate your startdate for the windows of your data?
you just need to calculate the number of days of your startdate since 1970-01-01, then divided it by the length of your window and take the remainder. That will be the offset days starttime.


I will explain it with an example: Asumming that you need your windows start at 2017-05-21 and the length of the windows is 7 days. I will create a dummy dataframe for the example.

row = Row("id", "date_field", "value")
df = sc.parallelize([
row(1, "2017-05-23", 5.0),
row(1, "2017-05-26", 10.0),
row(1, "2017-05-29", 4.0),
row(1, "2017-06-10", 3.0),]).toDF()

start_date = datetime(2017, 5, 21, 19, 0, 0) # 19:00:00 because my 
timezone 
days_since_1970_to_start_date =int(time.mktime(start_date.timetuple())/86400)
offset_days = days_since_1970_to_start_date % 7

w = F.window("date_field", "7 days", startTime='{} days'.format(
                                        offset_days))

df.groupby("id", w).agg(F.sum("value")).orderBy("window.start").show(10, False)

你会得到:

+---+------------------------------------------+----------+
|id |window                                    |sum(value)|
+---+------------------------------------------+----------+
|1  |[2017-05-21 19:00:00, 2017-05-28 19:00:00]|15.0      |
|1  |[2017-05-28 19:00:00, 2017-06-04 19:00:00]|4.0       |
|1  |[2017-06-04 19:00:00, 2017-06-11 19:00:00]|3.0       |
+---+------------------------------------------+----------+

这篇关于'pyspark.sql.functions.window' 函数的 'startTime' 参数有什么作用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆