仅基于groupby键追加新的聚合 [英] Append only new aggregates based on groupby keys

查看:82
本文介绍了仅基于groupby键追加新的聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须处理每天收到的一些文件.该信息具有主键(date,client_id,operation_id).因此,我创建了一个Stream,该流仅将新数据附加到增量表中:

I have to process some files which arrive to me daily. The information have primary key (date,client_id,operation_id). So I created a Stream which append only new data into a delta table:

operations\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/operations')

这工作正常,但是我需要总结按(date,client_id)分组的信息,因此我创建了另一个从此操作表到新表的流:

This is working fine, but i need to summarize this information grouped by (date,client_id), so i created another streaming from this operations table to a new table:

summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')

summarized= summarized.groupBy('client_id','date').agg(<a lot of aggs>)

summarized.repartition('date')\
        .writeStream\
        .outputMode('complete')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/summarized')

这是可行的,但是每次我将新数据放入operations表时,spark都会重新重新计算summarized.我尝试在第二个流媒体上使用附加模式,但它需要水印,并且日期为DateType.

This is working, but every time I got new data into operations table, spark recalculates summarized all over again. I tried to use the append mode on the second streaming, but it need watermarks, and the date is DateType.

有一种方法只能根据组键计算新的聚合并将其附加到summarized?

There is a way to only calculate new aggregates based on the group keys and append them on the summarized?

推荐答案

您需要使用使用窗口操作时,它将根据windowDurationslideDuration进行存储. windowDuration告诉您窗口的长度是多少,slideDuration告诉您应滑动窗口多少时间.

When you use windowed operations, it will do the bucketing according to windowDuration and slideDuration. windowDuration tells you what is the length of the window, and slideDuration tells by how much time should you slide the window.

如果您使用 window()[docs] ,您将得到一个结果window列以及与之分组的其他列,例如client_id

If you groupby using window() [docs], you will get a resultant window column along with other columns you groupby with like client_id

例如:

windowDuration = "10 minutes"
slideDuration = "5 minutes"
summarized = before_summary.groupBy(before_summary.client_id,
    window(before_summary.date, windowDuration, slideDuration)
).agg(<a lot of aggs>).orderBy('window')

这篇关于仅基于groupby键追加新的聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆