Kafka Streams - 跳跃窗口 - 去重密钥 [英] Kafka Streams - Hopping windows - deduplicate keys

查看:25
本文介绍了Kafka Streams - 跳跃窗口 - 去重密钥的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在每 5 分钟推进一次的 4 小时窗口上进行跳跃窗口聚合.由于跳跃窗口重叠,我得到了具有不同聚合值的重复键.

I'm doing a hopping window aggregation on a 4 hr window advancing every 5 mins. As the hopping windows are overlapping, I'm getting duplicate keys with different aggregated value.

TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)

如何消除具有重复数据的重复键或仅选择包含最新值的键.

How do I eliminate duplicate keys with repeating data or pick only the keys that holds the latest value.

推荐答案

2021 年 5 月更新: Kafka Streams API 支持 final"现在窗口结果,通过 suppress() 操作符.请参阅之前的文档链接以及博客 KafkaStreams 从 2019 年 3 月开始采用水印和触发器,了解详情.

Update May 2021: The Kafka Streams API supports "final" window results nowadays, via a suppress() operator. See the previous docs link as well as the blog Kafka Streams’ Take on Watermarks and Triggers from March 2019 for details.

定义窗口计算后,您可以抑制中间结果,在窗口关闭时为每个用户发出最终计数.

After defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed.

KGroupedStream<UserId, Event> grouped = ...;

grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
       .count()
       .suppress(Suppressed.untilWindowCloses(unbounded()))
       .filter((windowedUserId, count) -> count < 3)
       .toStream()
       .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));


原始答案(在不使用上面的 suppress() 运算符时仍然适用):


Original answer (still applies when NOT using the suppress() operator above):

如果我理解正确,那么这是预期的行为.您没有看到重复"键,但您会看到同一键的持续更新.

If I understand you correctly, then this is expected behavior. You are not seeing "duplicate" keys, but you see continuous updates for the same key.

认为:

# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...

# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...

看一下http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management,它更详细地描述了这一点.如果你想看到更少的重复"每个记录键,您可以通过 cache.max.bytes.buffering aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 在应用程序的配置中增加记录缓存的大小(使用 DSL 时).还有一个与 commit.interval.ms 的相互作用.

Take a look at the explanation at http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management, which describes this in more detail. If you want to see less "duplicates" per record key, you can increase the size of record caches (when using the DSL) via cache.max.bytes.buffering aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG in your application's configuration. There's also an interplay with commit.interval.ms.

如果您想知道为什么 Kafka Streams API 首先会以这种方式运行",我会推荐博客文章 https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 已发布本周早些时候.

If you are wondering "why does the Kafka Streams API behave in this way in the first place", I'd recommend the blog post https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ that was published earlier this week.

这篇关于Kafka Streams - 跳跃窗口 - 去重密钥的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆