Kafka Streams - 更新 KTable 上的聚合 [英] Kafka Streams - updating aggregations on KTable
问题描述
我有一个 KTable 数据看起来像这样(键 => 值),其中键是客户 ID,值是包含一些客户数据的小型 JSON 对象:
I have a KTable with data that looks like this (key => value), where keys are customer IDs, and values are small JSON objects containing some customer data:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
我想在这个KTable上做一些聚合,并且基本上保持对每个age_group
的记录数的计数.所需的KTable 数据如下所示:
I'd like to do some aggregations on this KTable, and basically keep a count of the number of records for each age_group
. The desired KTable data would look like this:
"18-24" => 3
"25-30" => 1
假设 Alice
属于上述 18-24
组,她的生日使她进入了新的年龄组.支持第一个 KTable 的状态存储现在应该如下所示:
Lets say Alice
, who is in the 18-24
group above, has a birthday that puts her in the new age group. The state store backing the first KTable should now look like this:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
而且我希望得到的聚合 KTable 结果能够反映这一点.例如
And I'd like the resulting aggregated KTable results to reflect this. e.g.
"18-24" => 2
"25-30" => 2
我可能过度概括了这里:
在 Kafka Streams 中没有最终聚合这样的东西......根据您的用例,手动重复数据删除将是解决问题的一种方法"
In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"
但到目前为止我只能计算一个运行总数,例如Alice 的生日将被解释为:
But I have only been able to calculate a running total so far, e.g. Alice's birthday would be interpreted as:
"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well
<小时>
这里有一些我注意到的额外行为,这似乎是出乎意料的.
here is some additional behavior that I noticed that seems unexpected.
我使用的拓扑如下:
dataKTable = builder.table("compacted-topic-1", "users-json")
.groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
.count("age-range-counts")
<小时>
1) 空状态
现在,从初始的空状态开始,一切看起来像这样:
1) Empty State
Now, from the initial, empty state, everything looks like this:
compacted-topic-1
(empty)
dataKTable
(empty)
// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)
// count()
age-range-counts state store
(empty)
<小时>
2) 发送几条消息
现在,让我们向 compacted-topic-1
发送一条消息,该消息作为上述 KTable 流式传输.这是发生的事情:
2) Send a couple of messages
Now, lets send a message to the compacted-topic-1
, which is streamed as a KTable above. Here is what happens:
compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4
// count()
age-range-counts state store
18-24 => 0
<小时>
所以我想知道:
So I'm wondering:
- 我正在尝试使用 Kafka Streams 0.10.1 或 0.10.2 做可能吗?我已经尝试在 DSL 中使用
groupBy
和count
,但也许我需要使用类似reduce
的东西? - 另外,我在理解导致
add
减速器和subtract
减速器被调用的情况时遇到了一些麻烦,所以任何关于这些点的澄清将不胜感激.
- Is what I'm trying to do even possible using Kafka Streams 0.10.1 or 0.10.2? I've tried using
groupBy
andcount
in the DSL, but maybe I need to use something likereduce
? - Also, I'm having a little trouble understanding the circumstances that lead to the
add
reducer and thesubtract
reducer being called, so any clarification around any of these points will be greatly appreciated.
推荐答案
如果你的原始 KTable
包含 id ->Json
数据(我们称之为dataKTable
)你应该能够通过
If you have your original KTable
containing id -> Json
data (let's call it dataKTable
) you should be able to get what you want via
KTable countKTablePerRange
= dataKTable.groupBy(/* map your age-range to be the key*/)
.count("someStoreName");
这应该适用于所有版本的 Kafka Streams API.
This should work for all versions of Kafka's Streams API.
更新
关于重新分区主题中的 4 个值:正确.对基本KTable
"的每次更新都会为其旧值"和新值"写入一条记录.这是正确更新下游 KTable
所必需的.旧值必须从一个计数中删除,新值必须添加到另一个计数中.因为您的 (count) KTable
可能是分布式的(即,在多个并行运行的应用程序实例上共享),两条记录(旧的和新的)可能会在不同的实例中结束,因为它们可能具有不同的键,因此它们必须作为两个独立的记录发送.(不过,您在问题中显示的记录格式应该更复杂.)
About the 4 values in the re-partitioning topic: that's correct. Each update to the "base KTable
" writes a record for it's "old value" and it's "new value". This is required to update the downstream KTable
correctly. The old value must be removed from one count and the new value must be added to another count. Because your (count) KTable
is potentially distributed (ie, shared over multiple parallel running app instances), both records (old and new) might end up at different instances because both might have different key and thus they must be sent as two independent records. (The record format should be more complex that you show in your question though.)
这也解释了为什么需要减法器和加法器.减法器从聚合结果中删除旧记录,而加法器将新记录添加到聚合结果中.
This also explains, why you need a subtractor and an adder. The subtractor removes old record from the agg result, while the adder adds new record to the agg result.
仍然不确定为什么在结果中看不到正确的计数.你运行了多少实例?也许尝试通过在 StreamsConfig
中设置 cache.max.bytes.buffering=0
来禁用 KTable
缓存.
Still not sure why you don't see the correct count in the result. How many instanced to you run? Maybe try to disable KTable
cache by setting cache.max.bytes.buffering=0
in StreamsConfig
.
这篇关于Kafka Streams - 更新 KTable 上的聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!