TopologyTestDriver在KTable聚合上发送不正确的消息 [英] TopologyTestDriver sending incorrect message on KTable aggregations

查看:82
本文介绍了TopologyTestDriver在KTable聚合上发送不正确的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个聚合在KTable上的拓扑. 这是我创建的一种通用方法,用于根据我拥有的不同主题构建此拓扑.

I have a topology that aggregates on a KTable. This is a generic method I created to build this topology on different topics I have.

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}

这在使用Kafka时非常有效,但在通过`TopologyTestDriver`进行测试时效果不佳.

This works pretty well when using Kafka, but not when testing via `TopologyTestDriver`.

在两种情况下,当我得到更新时,首先调用subtractor,然后调用adder.问题在于,使用TopologyTestDriver时,会发出两条消息进行更新:一条消息在subtractor调用之后,另一条消息在adder调用之后.更不用说subrtractor之后和adder之前发送的消息处于错误的阶段.

In both scenarios, when I get an update, the subtractor is called first, and then the adder is called. The problem is that when using the TopologyTestDriver, two messages are sent out for updates: one after the subtractor call, and another one after the adder call. Not to mention that the message that is sent after the subrtractor and before the adder is in an incorrect stage.

还有其他人可以确认这是一个错误吗?我已经针对Kafka 2.0.1版和2.1.0版进行了测试.

Any one else could confirm this is a bug? I've tested this for both Kafka versions 2.0.1 and 2.1.0.


我在github中创建了一个测试用例来说明问题: https://github.com/mulho/topology-testcase


I created a testcase in github to illustrate the issue: https://github.com/mulho/topology-testcase

推荐答案

预期的行为是有两个输出记录(一个减"记录和一个加"记录).了解它的工作原理有些棘手,所以让我尝试解释一下.

It is expected behavior that there are two output records (one "minus" record, and one "plus" record). It's a little tricky to understand how it works, so let me try to explain.

假设您具有以下输入表:

Assume you have the following input table:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <10,3>
  C  |  <11,4>

KTable#groupBy()上,将值的第一部分提取为新键(即1011),然后将第二部分相加(即234)在聚合中.因为AB记录都具有10作为新键,所以您将对2+3求和,并且还将对新键11求和4.结果表将是:

On KTable#groupBy() you extract the first part of the value as new key (ie, 10 or 11) and later sum the second part (ie, 2, 3, 4) in the aggregation. Because A and B record both have 10 as new key, you would sum 2+3 and you would also sum 4 for new key 11. The result table would be:

 key |  value
-----+---------
  10 |  5
  11 |  4

现在假定更新记录<B,<11,5>>将原始输入KTable更改为:

Now assume that an update record <B,<11,5>> change the original input KTable to:

 key |  value
-----+---------
  A  |  <10,2>
  B  |  <11,5>
  C  |  <11,4>

因此,新的结果表应将115+4102相加:

Thus, the new result table should sum up 5+4 for 11 and 2 for 10:

 key |  value
-----+---------
  10 |  2
  11 |  9

如果将第一个结果表与第二个结果表进行比较,您可能会注意到两个行都已更新.从10|5中减去旧的B|<10,3>记录,生成10|2,将新的B|<11,5>记录添加到11|4,从而生成11|9.

If you compare the first result table with the second, you might notice that both rows got update. The old B|<10,3> record is subtracted from 10|5 resulting in 10|2 and the new B|<11,5> record is added to 11|4 resulting in 11|9.

这正是您看到的两个输出记录.第一个输出记录(执行减法后)更新第一行(它将减去不再属于聚合结果的旧值),而第二个记录将新值添加到聚合结果中.在我们的示例中,减法记录将为<10,<null,<10,3>>>,加法记录将为<11,<<11,5>,null>>(这些记录的格式为<key, <plus,minus>>(请注意,减法记录仅设置了minus部分,而加法记录仅设置了minus部分) plus部分).

This is exactly the two output records you see. The first output record (after subtract is executed), updates the first row (it subtracts the old value that is not part of the aggregation result any longer), while the second record adds the new value to the aggregation result. In our example, the subtract record would be <10,<null,<10,3>>> and the add record would be <11,<<11,5>,null>> (the format of those record is <key, <plus,minus>> (note that the subtract record only set the minus part while the add record only set the plus part).

最后说明:不可能将正负记录放在一起,因为正负记录的键可以不同(在我们的示例中为1110),因此可能会进入不同的分区.这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一个包含加号和减号部分的记录.

Final remark: it is not possible to put plus and minus records together, because the key of the plus and minus record can be different (in our example 11 and 10), and thus might go into different partitions. This implies that the plus and minus operation might be executed by different machines and thus it's not possible to only emit one record that contains both plus and minus part.

这篇关于TopologyTestDriver在KTable聚合上发送不正确的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆