仅基于groupby键追加新的聚合 [英] Append only new aggregates based on groupby keys
问题描述
我必须处理每天收到的一些文件.该信息具有主键(date,client_id,operation_id)
.因此,我创建了一个Stream,该流仅将新数据附加到增量表中:
I have to process some files which arrive to me daily. The information have primary key (date,client_id,operation_id)
. So I created a Stream which append only new data into a delta table:
operations\
.repartition('date')\
.writeStream\
.outputMode('append')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/operations')
这工作正常,但是我需要总结按(date,client_id)
分组的信息,因此我创建了另一个从此操作表到新表的流:
This is working fine, but i need to summarize this information grouped by (date,client_id)
, so i created another streaming from this operations table to a new table:
summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.groupBy('client_id','date').agg(<a lot of aggs>)
summarized.repartition('date')\
.writeStream\
.outputMode('complete')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/summarized')
这是可行的,但是每次我将新数据放入operations
表时,spark都会重新重新计算summarized
.我尝试在第二个流媒体上使用附加模式,但它需要水印,并且日期为DateType.
This is working, but every time I got new data into operations
table, spark recalculates summarized
all over again. I tried to use the append mode on the second streaming, but it need watermarks, and the date is DateType.
有一种方法只能根据组键计算新的聚合并将其附加到summarized
?
There is a way to only calculate new aggregates based on the group keys and append them on the summarized
?
推荐答案
您需要使用使用窗口操作时,它将根据windowDuration
和slideDuration
进行存储. windowDuration
告诉您窗口的长度是多少,slideDuration
告诉您应滑动窗口多少时间.
When you use windowed operations, it will do the bucketing according to windowDuration
and slideDuration
. windowDuration
tells you what is the length of the window, and slideDuration
tells by how much time should you slide the window.
如果您使用 window()[docs] ,您将得到一个结果window
列以及与之分组的其他列,例如client_id
If you groupby using window() [docs], you will get a resultant window
column along with other columns you groupby with like client_id
例如:
windowDuration = "10 minutes"
slideDuration = "5 minutes"
summarized = before_summary.groupBy(before_summary.client_id,
window(before_summary.date, windowDuration, slideDuration)
).agg(<a lot of aggs>).orderBy('window')
这篇关于仅基于groupby键追加新的聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!