pyspark.sql.functions.window函数的startTime参数有什么作用? [英] What does the 'pyspark.sql.functions.window' function's 'startTime' argument do?

查看:239
本文介绍了pyspark.sql.functions.window函数的startTime参数有什么作用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在官方文档中只有一个简单的示例:

In the offcial doc there is just a simple example:

startTime是相对于1970-01-01 00:00:00 UTC的偏移量 从哪开始 窗口间隔.例如,为了使每小时滚动的窗口开始15分钟 过去一个小时,例如12:15-13:15,13:15-14:15 ...提供startTime作为15 minutes.

The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes.

但是我想知道它如何与所有参数一起工作.

But I want to know how does it works with all arguments.

例如:

ts_list = map(lambda x: datetime.datetime(2017, 1, 9, 9, 0, 10) + datetime.timedelta(seconds=x), range(30))
rdd = spark.sparkContext.parallelize(ts_list).map(lambda x: (x, 1))
df = spark.createDataFrame(rdd, schema=['dt', 'val'])
win = df.groupBy(window("dt", "5 seconds", '4 seconds', '3 seconds')).agg(sum("val").alias("sum"))
pprint.pprint(win.select(win['window']['start'].cast('string').alias('start'),
                         win['window']['end'].cast('string').alias('end')).collect())

输出:

[Row(start=u'2017-01-09 09:00:19', end=u'2017-01-09 09:00:24'),                 
 Row(start=u'2017-01-09 09:00:35', end=u'2017-01-09 09:00:40'),
 Row(start=u'2017-01-09 09:00:27', end=u'2017-01-09 09:00:32'),
 Row(start=u'2017-01-09 09:00:07', end=u'2017-01-09 09:00:12'),
 Row(start=u'2017-01-09 09:00:31', end=u'2017-01-09 09:00:36'),
 Row(start=u'2017-01-09 09:00:39', end=u'2017-01-09 09:00:44'),
 Row(start=u'2017-01-09 09:00:11', end=u'2017-01-09 09:00:16'),
 Row(start=u'2017-01-09 09:00:23', end=u'2017-01-09 09:00:28'),
 Row(start=u'2017-01-09 09:00:15', end=u'2017-01-09 09:00:20')]

那为什么呢?

推荐答案

与数据启动时无关.当然,只有在该窗口中有一些数据之前,第一个窗口才会出现.但是startTime与您的数据无关. 如文献所述,startTime是相对于1970-01-01 19:00:00 UTC的偏移量,使用该偏移量可以启动窗口间隔. 如果您创建这样的窗口:
w = F.window("date_field","7 days",startTime ='6 days')

It has nothing to do with when your data start. Of course the first window will appear only until you have some data in that window. But the startTime has nothing to do with your data. As documentaiton says, the startTime is the offset with respect to 1970-01-01 19:00:00 UTC with which to start window intervals. if you create a window like this:
w = F.window("date_field", "7 days", startTime='6 days')

火花将从1970-01-06开始生成7天的窗口:

spark will generate the windows of 7 days starting from 1970-01-06:

1970-01-06 19:00:00,1970-01-13 19:00:00
1970-01-13 19:00:00,1970-01-20 19:00:00
1970-01-20 19:00:00,1970-01-27 19:00:00
...
2017-05-16 19:00:00,2017-05-23 19:00:00
(如果继续进行计算,您将到达该日期) ...
但是,您只会看到与数据框日期相关的窗口. 19:00:00是因为我的时区是-05
如果您创建这样的窗口:

1970-01-06 19:00:00, 1970-01-13 19:00:00
1970-01-13 19:00:00, 1970-01-20 19:00:00
1970-01-20 19:00:00, 1970-01-27 19:00:00
...
2017-05-16 19:00:00, 2017-05-23 19:00:00
(if you continue calculating you get to this date) ...
But you only will see the windows that are related to the dates of your dataframe. The 19:00:00 is because my timezone which is -05
if you create a window like this:

w = F.window("date_field","7 days",startTime ='2 days')

w = F.window("date_field", "7 days", startTime='2 days')

spark将从1970-01-02开始生成7天的窗口:

spark will generate the windows of 7 days starting from 1970-01-02:

1970-01-02 19:00:00,1970-01-09 19:00:00
1970-01-09 19:00:00,1970-01-16 19:00:00
...
2017-05-19 19:00:00,2017-05-26 19:00:00
(如果您继续计算,您会到现在为止)
...

同样,您将只看到与数据框的日期相关的窗口.

因此,如何计算数据窗口的开始日期?
您只需要计算自1970年1月1日起的开始日期的天数,然后将其除以窗口长度即可得到余数.那将是偏移天数的开始时间.


我将用一个例子来解释它: 假设您需要Windows从 2017-05-21 开始,并且窗口的长度为7天.我将为该示例创建一个虚拟数据框.

1970-01-02 19:00:00, 1970-01-09 19:00:00
1970-01-09 19:00:00, 1970-01-16 19:00:00
...
2017-05-19 19:00:00, 2017-05-26 19:00:00
(if you continue calculating you get to this date)
...

Again you only will see the windows that are related to the dates of your dataframe.

So, how can you calculate your startdate for the windows of your data?
you just need to calculate the number of days of your startdate since 1970-01-01, then divided it by the length of your window and take the remainder. That will be the offset days starttime.


I will explain it with an example: Asumming that you need your windows start at 2017-05-21 and the length of the windows is 7 days. I will create a dummy dataframe for the example.

row = Row("id", "date_field", "value")
df = sc.parallelize([
row(1, "2017-05-23", 5.0),
row(1, "2017-05-26", 10.0),
row(1, "2017-05-29", 4.0),
row(1, "2017-06-10", 3.0),]).toDF()

start_date = datetime(2017, 5, 21, 19, 0, 0) # 19:00:00 because my 
timezone 
days_since_1970_to_start_date =int(time.mktime(start_date.timetuple())/86400)
offset_days = days_since_1970_to_start_date % 7

w = F.window("date_field", "7 days", startTime='{} days'.format(
                                        offset_days))

df.groupby("id", w).agg(F.sum("value")).orderBy("window.start").show(10, False)

您将得到:

+---+------------------------------------------+----------+
|id |window                                    |sum(value)|
+---+------------------------------------------+----------+
|1  |[2017-05-21 19:00:00, 2017-05-28 19:00:00]|15.0      |
|1  |[2017-05-28 19:00:00, 2017-06-04 19:00:00]|4.0       |
|1  |[2017-06-04 19:00:00, 2017-06-11 19:00:00]|3.0       |
+---+------------------------------------------+----------+

这篇关于pyspark.sql.functions.window函数的startTime参数有什么作用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆