Kafka 流聚合是否有任何排序保证? [英] Does the Kafka streams aggregation have any ordering guarantee?

查看:20
本文介绍了Kafka 流聚合是否有任何排序保证?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 Kafka 主题包含由 deviceId 键控的状态.我想使用 KStreamBuilder.stream().groupByKey().aggregate(...) 只在 TimeWindow 中保留状态的最新值.我猜想,只要主题按键分区,聚合函数总是可以以这种方式返回最新值:

My Kafka topic contains statuses keyed by deviceId. I would like to use KStreamBuilder.stream().groupByKey().aggregate(...) to only keep the latest value of a status in a TimeWindow. I guess that, as long as the topic is partitioned by key, the aggregation function can always return the latest values in this fashion:

(key, value, old_value) ->价值

这是我可以从 Kafka Streams 获得的保证吗?我应该推出自己的检查时间戳的处理方法吗?

Is this a guarantee I can expect from Kafka Streams? Should I roll my own processing method that checks the timestamp?

推荐答案

Kafka Streams 保证按 offsets 而不是按 timestamp.因此,默认情况下上次更新获胜"策略基于偏移量而不是时间戳.迟到的记录(在时间戳上定义的迟到")是基于时间戳的乱序,它们不会被重新排序以保持原始偏移量顺序.

Kafka Streams guaranteed ordering by offsets but not by timestamp. Thus, by default "last update wins" policy is based on offsets but not on timestamp. Late arriving records ("late" defined on timestamps) are out-of-order based on timestamps and they will not be reordered to keep original offsets order.

如果你想让你的窗口包含基于时间戳的最新值,你需要使用处理器 API (PAPI) 来完成这项工作.

If you want to have your window containing the latest value based on timestamps you will need to use Processor API (PAPI) to make this work.

在 Kafka Streams 的 DSL 中,您无法访问获得正确结果所需的记录时间戳.一个简单的方法可能是在 .groupBy() 之前放置一个 .transform() 并将时间戳添加到记录(即它的值)本身.因此,您可以在 Aggregator 中使用时间戳(顺便说一句:使用更简单的 .reduce() 也可以代替 .aggregate()).最后,您需要在 .aggregate() 之后执行 .mapValues() 以再次从值中删除时间戳.

Within Kafka Streams' DSL, you cannot access the record timestamp that is required to get the correct result. A easy way might be to put a .transform() before .groupBy() and add the timestamp to the record (ie, its value) itself. Thus, you can use the timestamp within your Aggregator (btw: a .reduce() that is simpler to use might also work instead of .aggregate()). Finally, you need to do .mapValues() after your .aggregate() to remove the timestamp from the value again.

使用这种 DSL 和 PAPI 的混合匹配方法应该可以简化您的代码,因为您可以使用 DSL 窗口支持和 KTable 并且不需要做低级时间窗口和状态管理.

Using this mix-and-match approach of DSL and PAPI should simplify your code, as you can use DSL windowing support and KTable and do not need to do low-level time-window and state management.

当然,您也可以在单个低级有状态处理器中完成所有这些工作,但我不建议这样做.

Of course, you can also just do all this in a single low-level stateful processor, but I would not recommend it.

这篇关于Kafka 流聚合是否有任何排序保证?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆