TopologyTestDriver 在 KTable 聚合上发送错误消息 [英] TopologyTestDriver sending incorrect message on KTable aggregations

查看:18
本文介绍了TopologyTestDriver 在 KTable 聚合上发送错误消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个聚合在 KTable 上的拓扑.这是我创建的通用方法,用于在我拥有的不同主题上构建此拓扑.

I have a topology that aggregates on a KTable. This is a generic method I created to build this topology on different topics I have.

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}

这在使用 Kafka 时非常有效,但在通过TopologyTestDriver"进行测试时则不然.

This works pretty well when using Kafka, but not when testing via `TopologyTestDriver`.

在这两种情况下,当我收到更新时,首先调用 subtractor,然后调用 adder.问题在于,当使用 TopologyTestDriver 时,会发出两条消息进行更新:一条在 subtractor 调用之后,另一条在 adder 之后称呼.更不用说在subrtractor之后和adder之前发送的消息处于不正确的阶段.

In both scenarios, when I get an update, the subtractor is called first, and then the adder is called. The problem is that when using the TopologyTestDriver, two messages are sent out for updates: one after the subtractor call, and another one after the adder call. Not to mention that the message that is sent after the subrtractor and before the adder is in an incorrect stage.

还有其他人可以确认这是一个错误吗?我已经为 Kafka 版本 2.0.1 和 2.1.0 测试了这个.

Any one else could confirm this is a bug? I've tested this for both Kafka versions 2.0.1 and 2.1.0.


我在 github 中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase

推荐答案

有两条输出记录(一条减"记录,一条加"记录)是预期的行为.理解它是如何工作的有点棘手,所以让我试着解释一下.

It is expected behavior that there are two output records (one "minus" record, and one "plus" record). It's a little tricky to understand how it works, so let me try to explain.

假设您有以下输入表:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <10,3>
  C  |  <11,4>

KTable#groupBy() 上,您将值的第一部分提取为新键(即 1011),然后再提取对聚合中的第二部分(即 234)求和.因为 AB 记录都将 10 作为新键,所以你可以求和 2+3 并且你也为新密钥 11 求和 4.结果表将是:

On KTable#groupBy() you extract the first part of the value as new key (ie, 10 or 11) and later sum the second part (ie, 2, 3, 4) in the aggregation. Because A and B record both have 10 as new key, you would sum 2+3 and you would also sum 4 for new key 11. The result table would be:

 key |  value
-----+---------
  10 |  5
  11 |  4

现在假设一条更新记录>将原始输入KTable改为:

Now assume that an update record <B,<11,5>> change the original input KTable to:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <11,5>
  C  |  <11,4>

因此,新的结果表应该总结 115+4102 :

Thus, the new result table should sum up 5+4 for 11 and 2 for 10:

 key |  value
-----+---------
  10 |  2
  11 |  9

如果将第一个结果表与第二个结果表进行比较,您可能会注意到 两行 都得到了更新.从 10|5 中减去旧的 B|<10,3> 记录,得到 10|2 和新的 B|<11,5> 记录被添加到 11|4 导致 11|9.

If you compare the first result table with the second, you might notice that both rows got update. The old B|<10,3> record is subtracted from 10|5 resulting in 10|2 and the new B|<11,5> record is added to 11|4 resulting in 11|9.

这正是你看到的两条输出记录.第一个输出记录(执行减法后)更新第一行(它减去不再属于聚合结果的旧值),而第二个记录将新值添加到聚合结果中.在我们的示例中,减去记录将是 <10,>> 而添加记录将是 <11,<<;11,5>,null>>(那些记录的格式是>(注意减记录只设置minus 部分,而add 记录只设置plus 部分).

This is exactly the two output records you see. The first output record (after subtract is executed), updates the first row (it subtracts the old value that is not part of the aggregation result any longer), while the second record adds the new value to the aggregation result. In our example, the subtract record would be <10,<null,<10,3>>> and the add record would be <11,<<11,5>,null>> (the format of those record is <key, <plus,minus>> (note that the subtract record only set the minus part while the add record only set the plus part).

最后一点:不能把加减记录放在一起,因为加减记录的键可以不同(在我们的例子中1110),因此可能会进入不同的分区.这意味着加减运算可能由不同的机器执行,因此不可能只发出一条同时包含加减部分的记录.

Final remark: it is not possible to put plus and minus records together, because the key of the plus and minus record can be different (in our example 11 and 10), and thus might go into different partitions. This implies that the plus and minus operation might be executed by different machines and thus it's not possible to only emit one record that contains both plus and minus part.

这篇关于TopologyTestDriver 在 KTable 聚合上发送错误消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆