Kafka Stream groupBy 行为:聚合的许多中间输出/更新 [英] Kafka Stream groupBy behavior: many intermediate outputs/updates for an aggregation

查看:27
本文介绍了Kafka Stream groupBy 行为:聚合的许多中间输出/更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Kafka Stream 来聚合 People 的某些属性.

I'm trying to play with Kafka Stream to aggregate some attribute of People.

我有一个像这样的 kafka 流测试:

I have a kafka stream test like this :

    new ConsumerRecordFactory[Array[Byte], Character]("input", new ByteArraySerializer(), new CharacterSerializer())
    var i = 0
    while (i != 5) {
      testDriver.pipeInput(
        factory.create("input",
          Character(123,12), 15*10000L))
      i+=1;
    }
    val output = testDriver.readOutput....

我正在尝试按这样的键对值进行分组:

I'm trying to group the value by key like this :

    streamBuilder.stream[Array[Byte], Character](inputKafkaTopic)
      .filter((key, _) => key == null )
      .mapValues(character=> PersonInfos(character.id, character.id2, character.age) // case class
      .groupBy((_, value) => CharacterInfos(value.id, value.id2) // case class)
        .count().toStream.print(Printed.toSysOut[CharacterInfos, Long])

当我运行代码时,我得到了这个:

When i'm running the code, I got this :

[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 1
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 2
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 3
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 4
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 5

为什么我得到 5 行而不是只有一行带有 CharacterInfos 和计数?groupBy 不只是更改密钥吗?

Why i'm getting 5 rows instead of just one line with CharacterInfos and the count ? Doesn't groupBy just change the key ?

推荐答案

如果您使用 TopologyTestDriver 缓存被有效禁用,因此,每个输入记录将总是产生一个输出记录.这是设计使然,因为缓存意味着非确定性行为,这使得编写实际的单元测试变得非常困难.

If you use the TopologyTestDriver caching is effectively disabled and thus, every input record will always produce an output record. This is by design, because caching implies non-deterministic behavior what makes itsvery hard to write an actual unit test.

如果你在实际应用中部署代码,行为会有所不同,缓存会减少输出负载——你会得到哪些中间结果,是没有定义的(即非确定性的);比较 Michael Noll 的回答.

If you deploy the code in a real application, the behavior will be different and caching will reduce the output load -- which intermediate results you will get, is not defined (ie, non-deterministic); compare Michael Noll's answer.

对于你的单元测试来说,实际上应该没有关系,你可以测试所有输出记录(即所有中间结果),或者将所有输出记录放入一个键值Map并且只测试每个键最后发出的记录(如果你不关心中间结果).

For your unit test, it should actually not really matter, and you can either test for all output records (ie, all intermediate results), or put all output records into a key-value Map and only test for the last emitted record per key (if you don't care about the intermediate results) in the test.

此外,您可以使用 suppress() 运算符来细粒度控制您获得的输出消息.suppress()—与缓存相反—是完全确定的,因此编写单元测试效果很好.但是,请注意 suppress() 是事件时间驱动的,因此,如果您停止发送新记录,时间不会提前并且 suppress() 不会发出数据.对于单元测试,考虑这一点很重要,因为您可能需要发送一些额外的虚拟"数据来触发您实际想要测试的输出.有关 suppress() 的更多详细信息,请查看此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

Furthermore, you could use suppress() operator to get fine grained control over what output messages you get. suppress()—in contrast to caching—is fully deterministic and thus writing a unit test works well. However, note that suppress() is event-time driven, and thus, if you stop sending new records, time does not advance and suppress() does not emit data. For unit testing, this is important to consider, because you might need to send some additional "dummy" data to trigger the output you actually want to test for. For more details on suppress() check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

这篇关于Kafka Stream groupBy 行为:聚合的许多中间输出/更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆