应该由 KTable 发出的事件 [英] Events that should be emitted by a KTable

查看:20
本文介绍了应该由 KTable 发出的事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试测试作为最后一个节点具有 KTable 的拓扑.我的测试使用的是成熟的 Kafka 集群(通过 Confluent 的 Docker 镜像),所以我使用 TopologyTestDriver.

I am trying to test a topology that, as the last node, has a KTable. My test is using a full-blown Kafka Cluster (through confluent's Docker images) so I am not using the TopologyTestDriver.

我的拓扑有键值类型的输入 String ->CustomerString 的输出 ->客户映射.serdes、模式和与模式注册表的集成都按预期工作.

My topology has input of key-value types String -> Customer and output of String -> CustomerMapped. The serdes, schemas and integration with Schema Registry all work as expected.

我正在使用 Scala、Kafka 2.2.0、Confluent Platform 5.2.1 和 kafka-streams-scala.我的拓扑结构尽可能简化,如下所示:

I am using Scala, Kafka 2.2.0, Confluent Platform 5.2.1 and kafka-streams-scala. My topology, as simplified as possible, looks something like this:

val otherBuilder = new StreamsBuilder()

otherBuilder
     .table[String,Customer](source)
     .mapValues(c => CustomerMapped(c.surname, c.age))
     .toStream.to(target)   

(所有隐式 serdes、ProducedConsumed 等都是默认的,并且被正确找到)

(all implicit serdes, Produced, Consumed, etc are the default ones and are found correctly)

我的测试包括将一些记录(data)同步且不间断地发送到 source 主题,然后从 target 读回> 主题,我将结果与 expected 进行比较:

My test consists in sending a few records (data) onto the source topic in synchronously and without pause, and reading back from the target topic, I compare the results with expected:

val data: Seq[(String, Customer)] = Vector(
   "key1" -> Customer(0, "Obsolete", "To be overridden", 0),
   "key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
   "key1" -> Customer(1, "Billy", "The Man", 32),
   "key2" -> Customer(2, "Tommy", "The Guy", 31),
   "key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
   "key1" -> CustomerMapped("The Man", 32),
   "key2" -> CustomerMapped("The Guy", 31),
   "key3" -> CustomerMapped("The Lady", 40)
)

我构建了Kafka Stream应用程序,设置在其他设置之间,如下两个:

I build the Kafka Stream application, setting between other settings, the following two:

p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)

所以我希望 KTable 使用缓存,提交之间的间隔为 5 秒,缓存大小为 50MB(对于我的场景来说已经足够了).

So I expect the KTable to use caching, having an interval of 5 seconds between commits and a cache size of 50MB (more than enough for my scenario).

我的问题是我从 target 主题读回的结果总是包含 key1 的多个条目.我原以为Obsolete 和`Obsolete1 不会为记录发出任何事件.实际输出为:

My problem is that the results I read back from the target topic always contain multiple entries for key1. I would have expected no event to be emitted for the records with Obsolete and `Obsolete1. The actual output is:

Vector(
    "key1" -> CustomerMapped("To be overridden", 0),
    "key1" -> CustomerMapped("To be overridden2", 0),
    "key1" -> CustomerMapped("The Man", 32),
    "key2" -> CustomerMapped("The Guy", 31),
    "key3" -> CustomerMapped("The Lady", 40)
)

最后要提到的一件事:这个测试过去一直按预期工作,直到我将 Kafka 从 2.1.0 更新到 2.2.0.我再次验证了降级我的应用程序.

One final thing to mention: this test used to work as expected until I updated Kafka from 2.1.0 to 2.2.0. I verified this downgrading my application again.

我很困惑,谁能指出 KTables 在 2.2.x 版本中的行为是否有所改变?或者也许现在我必须设置新设置来控制事件的发射?

I am quite confused, can anyone point out whether something changed in the behaviour of KTables in the 2.2.x versions? Or maybe there are now new settings I have to set to control the emission of events?

推荐答案

在 Kafka 2.2 中,引入了一项优化以减少 Kafka Streams 的资源占用.如果计算不需要 KTable,则不一定要具体化.这适用于您的情况,因为 mapValues() 可以即时计算.由于 KTable 没有物化,所以没有缓存,因此每个输入记录产生一个输出记录.

In Kafka 2.2, an optimization was introduced to reduce the resource footprint of Kafka Streams. A KTable is not necessarily materialized if it's not required for the computation. This holds for your case, because mapValues() can be computed on-the-fly. Because the KTable is not materialized, there is no cache and thus each input record produces one output record.

比较:https://issues.apache.org/jira/browse/KAFKA-6036

如果你想强制KTable物化,你可以将Materilized.as("someStoreName")传入StreamsBuilder#table()方法.

If you want to enforce KTable materialization, you can pass in Materilized.as("someStoreName") into StreamsBuilder#table() method.

这篇关于应该由 KTable 发出的事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆