Kafka Streams - 更新 KTable 上的聚合 [英] Kafka Streams - updating aggregations on KTable

查看:23
本文介绍了Kafka Streams - 更新 KTable 上的聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 KTable 数据看起来像这样(键 => 值),其中键是客户 ID,值是包含一些客户数据的小型 JSON 对象:

I have a KTable with data that looks like this (key => value), where keys are customer IDs, and values are small JSON objects containing some customer data:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我想在这个KTable上做一些聚合,并且基本上保持对每个age_group的记录数的计数.所需的KTable 数据如下所示:

I'd like to do some aggregations on this KTable, and basically keep a count of the number of records for each age_group. The desired KTable data would look like this:

"18-24" => 3
"25-30" => 1

假设 Alice 属于上述 18-24 组,她的生日使她进入了新的年龄组.支持第一个 KTable 的状态存储现在应该如下所示:

Lets say Alice, who is in the 18-24 group above, has a birthday that puts her in the new age group. The state store backing the first KTable should now look like this:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

而且我希望得到的聚合 KTable 结果能够反映这一点.例如

And I'd like the resulting aggregated KTable results to reflect this. e.g.

"18-24" => 2
"25-30" => 2

可能过度概括了这里:

在 Kafka Streams 中没有最终聚合这样的东西......根据您的用例,手动重复数据删除将是解决问题的一种方法"

In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"

但到目前为止我只能计算一个运行总数,例如Alice 的生日将被解释为:

But I have only been able to calculate a running total so far, e.g. Alice's birthday would be interpreted as:

"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well

<小时>

这里有一些我注意到的额外行为,这似乎是出乎意料的.


here is some additional behavior that I noticed that seems unexpected.

我使用的拓扑如下:

dataKTable = builder.table("compacted-topic-1", "users-json")
    .groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
    .count("age-range-counts")

<小时>

1) 空状态

现在,从初始的空状态开始,一切看起来像这样:


1) Empty State

Now, from the initial, empty state, everything looks like this:

compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)

<小时>

2) 发送几条消息

现在,让我们向 compacted-topic-1 发送一条消息,该消息作为上述 KTable 流式传输.这是发生的事情:


2) Send a couple of messages

Now, lets send a message to the compacted-topic-1, which is streamed as a KTable above. Here is what happens:

compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0

<小时>

所以我想知道:


So I'm wondering:

  • 我正在尝试使用 Kafka Streams 0.10.1 或 0.10.2 做可能吗?我已经尝试在 DSL 中使用 groupBycount,但也许我需要使用类似 reduce 的东西?
  • 另外,我在理解导致 add 减速器和 subtract 减速器被调用的情况时遇到了一些麻烦,所以任何关于这些点的澄清将不胜感激.
  • Is what I'm trying to do even possible using Kafka Streams 0.10.1 or 0.10.2? I've tried using groupBy and count in the DSL, but maybe I need to use something like reduce?
  • Also, I'm having a little trouble understanding the circumstances that lead to the add reducer and the subtract reducer being called, so any clarification around any of these points will be greatly appreciated.

推荐答案

如果你的原始 KTable 包含 id ->Json 数据(我们称之为dataKTable)你应该能够通过

If you have your original KTable containing id -> Json data (let's call it dataKTable) you should be able to get what you want via

KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
                .count("someStoreName");

这应该适用于所有版本的 Kafka Streams API.

This should work for all versions of Kafka's Streams API.

更新

关于重新分区主题中的 4 个值:正确.对基本KTable"的每次更新都会为其旧值"和新值"写入一条记录.这是正确更新下游 KTable 所必需的.旧值必须从一个计数中删除,新值必须添加到另一个计数中.因为您的 (count) KTable 可能是分布式的(即,在多个并行运行的应用程序实例上共享),两条记录(旧的和新的)可能会在不同的实例中结束,因为它们可能具有不同的键,因此它们必须作为两个独立的记录发送.(不过,您在问题中显示的记录格式应该更复杂.)

About the 4 values in the re-partitioning topic: that's correct. Each update to the "base KTable" writes a record for it's "old value" and it's "new value". This is required to update the downstream KTable correctly. The old value must be removed from one count and the new value must be added to another count. Because your (count) KTable is potentially distributed (ie, shared over multiple parallel running app instances), both records (old and new) might end up at different instances because both might have different key and thus they must be sent as two independent records. (The record format should be more complex that you show in your question though.)

这也解释了为什么需要减法器和加法器.减法器从聚合结果中删除旧记录,而加法器将新记录添加到聚合结果中.

This also explains, why you need a subtractor and an adder. The subtractor removes old record from the agg result, while the adder adds new record to the agg result.

仍然不确定为什么在结果中看不到正确的计数.你运行了多少实例?也许尝试通过在 StreamsConfig 中设置 cache.max.bytes.buffering=0 来禁用 KTable 缓存.

Still not sure why you don't see the correct count in the result. How many instanced to you run? Maybe try to disable KTable cache by setting cache.max.bytes.buffering=0 in StreamsConfig.

这篇关于Kafka Streams - 更新 KTable 上的聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆