Kafka Streams-更新KTable上的聚合 [英] Kafka Streams - updating aggregations on KTable

查看:213
本文介绍了Kafka Streams-更新KTable上的聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 KTable ,其数据如下所示(键=>值),其中键是客户ID,值是包含一些客户数据的小型JSON对象:

I have a KTable with data that looks like this (key => value), where keys are customer IDs, and values are small JSON objects containing some customer data:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我想对此 KTable 进行一些汇总,并基本上记录每个age_group的记录数.所需的 KTable 数据如下所示:

I'd like to do some aggregations on this KTable, and basically keep a count of the number of records for each age_group. The desired KTable data would look like this:

"18-24" => 3
"25-30" => 1

可以说,属于上述18-24组的Alice的生日使她进入了新的年龄段.支持第一个 KTable 的状态存储现在应如下所示:

Lets say Alice, who is in the 18-24 group above, has a birthday that puts her in the new age group. The state store backing the first KTable should now look like this:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我希望所得的汇总 KTable 结果能反映出这一点.例如

And I'd like the resulting aggregated KTable results to reflect this. e.g.

"18-24" => 2
"25-30" => 2

可能过于笼统地描述了

I may be overgeneralizing the issue described here:

在Kafka Streams中,没有最终的聚合……根据您的用例,手动重复数据删除将是解决问题的一种方式."

In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"

但是到目前为止,我只能计算出运行总计(例如爱丽丝的生日将被解释为:

But I have only been able to calculate a running total so far, e.g. Alice's birthday would be interpreted as:

"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well


编辑:这是我注意到的一些其他行为,这似乎是意外的.


here is some additional behavior that I noticed that seems unexpected.

我正在使用的拓扑如下:

The topology I'm using looks like:

dataKTable = builder.table("compacted-topic-1", "users-json")
    .groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
    .count("age-range-counts")


1)空状态

现在,从最初的空状态开始,一切看起来像这样:


1) Empty State

Now, from the initial, empty state, everything looks like this:

compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)


2)发送一些消息

现在,让我们向compacted-topic-1发送一条消息,该消息将作为 KTable 进行流式传输.这是发生了什么:


2) Send a couple of messages

Now, lets send a message to the compacted-topic-1, which is streamed as a KTable above. Here is what happens:

compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0


所以我想知道:


So I'm wondering:

  • 使用Kafka Streams 0.10.1或0.10.2甚至可以做到吗?我曾尝试在DSL中使用groupBycount,但是也许我需要使用类似reduce的东西?
  • 此外,我在理解导致add减速器和subtract减速器被调用的情况时遇到了一些麻烦,因此围绕这些要点进行的任何澄清将不胜感激.
  • Is what I'm trying to do even possible using Kafka Streams 0.10.1 or 0.10.2? I've tried using groupBy and count in the DSL, but maybe I need to use something like reduce?
  • Also, I'm having a little trouble understanding the circumstances that lead to the add reducer and the subtract reducer being called, so any clarification around any of these points will be greatly appreciated.

推荐答案

如果您的原始KTable包含id -> Json数据(我们称其为dataKTable),则应该可以通过它获得所需的信息

If you have your original KTable containing id -> Json data (let's call it dataKTable) you should be able to get what you want via

KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
                .count("someStoreName");

这应该适用于所有版本的Kafka的Streams API.

This should work for all versions of Kafka's Streams API.

更新

关于重新分区主题中的4个值:正确.每次对"base KTable"的更新都会为其旧值"和新值"写入一条记录.这是正确更新下游KTable所必需的.必须从一个计数中删除旧值,并且必须将新值添加到另一计数中.因为您的(count)KTable可能是分布式的(即,在多个并行运行的应用程序实例上共享),所以两个记录(旧记录和新记录)可能都在不同的实例中结束,因为它们可能具有不同的键,因此它们必须作为两个发送独立记录. (记录格式应该比您在问题中显示的更为复杂.)

About the 4 values in the re-partitioning topic: that's correct. Each update to the "base KTable" writes a record for it's "old value" and it's "new value". This is required to update the downstream KTable correctly. The old value must be removed from one count and the new value must be added to another count. Because your (count) KTable is potentially distributed (ie, shared over multiple parallel running app instances), both records (old and new) might end up at different instances because both might have different key and thus they must be sent as two independent records. (The record format should be more complex that you show in your question though.)

这也说明了为什么需要减法器和加法器.减法器从agg结果中删除旧记录,而加法器将新记录添加到agg结果中.

This also explains, why you need a subtractor and an adder. The subtractor removes old record from the agg result, while the adder adds new record to the agg result.

仍然不确定为什么您在结果中看不到正确的计数.您要运行多少实例?也许尝试通过在StreamsConfig中设置cache.max.bytes.buffering=0来禁用KTable缓存.

Still not sure why you don't see the correct count in the result. How many instanced to you run? Maybe try to disable KTable cache by setting cache.max.bytes.buffering=0 in StreamsConfig.

这篇关于Kafka Streams-更新KTable上的聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆