Kafka Stream组按行为:聚合的许多中间输出/更新 [英] Kafka Stream groupBy behavior: many intermediate outputs/updates for an aggregation

查看:55
本文介绍了Kafka Stream组按行为:聚合的许多中间输出/更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Kafka Stream来汇总People的某些属性.

I'm trying to play with Kafka Stream to aggregate some attribute of People.

我有这样的kafka流测试:

I have a kafka stream test like this :

    new ConsumerRecordFactory[Array[Byte], Character]("input", new ByteArraySerializer(), new CharacterSerializer())
    var i = 0
    while (i != 5) {
      testDriver.pipeInput(
        factory.create("input",
          Character(123,12), 15*10000L))
      i+=1;
    }
    val output = testDriver.readOutput....

我正在尝试按这样的键对值进行分组:

I'm trying to group the value by key like this :

    streamBuilder.stream[Array[Byte], Character](inputKafkaTopic)
      .filter((key, _) => key == null )
      .mapValues(character=> PersonInfos(character.id, character.id2, character.age) // case class
      .groupBy((_, value) => CharacterInfos(value.id, value.id2) // case class)
        .count().toStream.print(Printed.toSysOut[CharacterInfos, Long])

当我运行代码时,我得到了:

When i'm running the code, I got this :

[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 1
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 2
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 3
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 4
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 5

为什么我得到5行而不是用CharacterInfos和count获得一行? 难道groupBy只是更改密钥吗?

Why i'm getting 5 rows instead of just one line with CharacterInfos and the count ? Doesn't groupBy just change the key ?

推荐答案

如果有效地禁用了TopologyTestDriver缓存,则每条输入记录都会始终生成一条输出记录.这是设计使然,因为缓存意味着不确定的行为,这使得编写实际的单元测试非常困难.

If you use the TopologyTestDriver caching is effectively disabled and thus, every input record will always produce an output record. This is by design, because caching implies non-deterministic behavior what makes itsvery hard to write an actual unit test.

如果将代码部署在实际的应用程序中,则行为将有所不同,并且缓存将减少输出负载-您将获得的中间结果是未定义的(即不确定的);比较迈克尔·诺尔的答案.

If you deploy the code in a real application, the behavior will be different and caching will reduce the output load -- which intermediate results you will get, is not defined (ie, non-deterministic); compare Michael Noll's answer.

对于您的单元测试,实际上应该没关系,可以测试所有输出记录(即所有中间结果),也可以将所有输出记录放入键值Map中,仅测试测试中每个键的最后一次发出记录(如果您不关心中间结果).

For your unit test, it should actually not really matter, and you can either test for all output records (ie, all intermediate results), or put all output records into a key-value Map and only test for the last emitted record per key (if you don't care about the intermediate results) in the test.

此外,您可以使用suppress()运算符对获得的输出消息进行精细控制.与缓存相反,suppress()是完全确定的,因此编写单元测试效果很好.但是,请注意suppress()是事件时间驱动的,因此,如果停止发送新记录,则时间不会提前,并且suppress()不会发出数据.对于单元测试,考虑这一点很重要,因为您可能需要发送一些其他虚拟"数据来触发您实际要测试的输出.有关suppress()的更多详细信息,请查看此博客文章: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

Furthermore, you could use suppress() operator to get fine grained control over what output messages you get. suppress()—in contrast to caching—is fully deterministic and thus writing a unit test works well. However, note that suppress() is event-time driven, and thus, if you stop sending new records, time does not advance and suppress() does not emit data. For unit testing, this is important to consider, because you might need to send some additional "dummy" data to trigger the output you actually want to test for. For more details on suppress() check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

这篇关于Kafka Stream组按行为:聚合的许多中间输出/更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆