应该由 KTable 发出的事件 [英] Events that should be emitted by a KTable
问题描述
我正在尝试测试作为最后一个节点具有 KTable 的拓扑.我的测试使用的是成熟的 Kafka 集群(通过 Confluent 的 Docker 镜像),所以我不使用 TopologyTestDriver
.
I am trying to test a topology that, as the last node, has a KTable. My test is using a full-blown Kafka Cluster (through confluent's Docker images) so I am not using the TopologyTestDriver
.
我的拓扑有键值类型的输入 String ->Customer
和 String 的输出 ->客户映射
.serdes、模式和与模式注册表的集成都按预期工作.
My topology has input of key-value types String -> Customer
and output of String -> CustomerMapped
. The serdes, schemas and integration with Schema Registry all work as expected.
我正在使用 Scala、Kafka 2.2.0、Confluent Platform 5.2.1 和 kafka-streams-scala
.我的拓扑结构尽可能简化,如下所示:
I am using Scala, Kafka 2.2.0, Confluent Platform 5.2.1 and kafka-streams-scala
. My topology, as simplified as possible, looks something like this:
val otherBuilder = new StreamsBuilder()
otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target)
(所有隐式 serdes、Produced
、Consumed
等都是默认的,并且被正确找到)
(all implicit serdes, Produced
, Consumed
, etc are the default ones and are found correctly)
我的测试包括将一些记录(data
)同步且不间断地发送到 source
主题,然后从 target
读回> 主题,我将结果与 expected
进行比较:
My test consists in sending a few records (data
) onto the source
topic in synchronously and without pause, and reading back from the target
topic, I compare the results with expected
:
val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
我构建了Kafka Stream应用程序,设置在其他设置之间,如下两个:
I build the Kafka Stream application, setting between other settings, the following two:
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)
所以我希望 KTable 使用缓存,提交之间的间隔为 5 秒,缓存大小为 50MB(对于我的场景来说已经足够了).
So I expect the KTable to use caching, having an interval of 5 seconds between commits and a cache size of 50MB (more than enough for my scenario).
我的问题是我从 target
主题读回的结果总是包含 key1
的多个条目.我原以为Obsolete
和`Obsolete1 不会为记录发出任何事件.实际输出为:
My problem is that the results I read back from the target
topic always contain multiple entries for key1
. I would have expected no event to be emitted for the records with Obsolete
and `Obsolete1. The actual output is:
Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
最后要提到的一件事:这个测试过去一直按预期工作,直到我将 Kafka 从 2.1.0 更新到 2.2.0.我再次验证了降级我的应用程序.
One final thing to mention: this test used to work as expected until I updated Kafka from 2.1.0 to 2.2.0. I verified this downgrading my application again.
我很困惑,谁能指出 KTables 在 2.2.x 版本中的行为是否有所改变?或者也许现在我必须设置新设置来控制事件的发射?
I am quite confused, can anyone point out whether something changed in the behaviour of KTables in the 2.2.x versions? Or maybe there are now new settings I have to set to control the emission of events?
推荐答案
在 Kafka 2.2 中,引入了一项优化以减少 Kafka Streams 的资源占用.如果计算不需要 KTable
,则不一定要具体化.这适用于您的情况,因为 mapValues()
可以即时计算.由于 KTable
没有物化,所以没有缓存,因此每个输入记录产生一个输出记录.
In Kafka 2.2, an optimization was introduced to reduce the resource footprint of Kafka Streams. A KTable
is not necessarily materialized if it's not required for the computation. This holds for your case, because mapValues()
can be computed on-the-fly. Because the KTable
is not materialized, there is no cache and thus each input record produces one output record.
比较:https://issues.apache.org/jira/browse/KAFKA-6036
如果你想强制KTable
物化,你可以将Materilized.as("someStoreName")
传入StreamsBuilder#table()
方法.
If you want to enforce KTable
materialization, you can pass in Materilized.as("someStoreName")
into StreamsBuilder#table()
method.
这篇关于应该由 KTable 发出的事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!