KTable应该发出的事件 [英] Events that should be emitted by a KTable
问题描述
我正在尝试测试作为最后一个节点具有KTable的拓扑.我的测试使用的是成熟的Kafka集群(通过融合的Docker映像),所以我不使用 TopologyTestDriver
.
I am trying to test a topology that, as the last node, has a KTable. My test is using a full-blown Kafka Cluster (through confluent's Docker images) so I am not using the TopologyTestDriver
.
我的拓扑具有键值类型的输入 String->客户
和 String->的输出;客户映射
.Serdes,模式以及与Schema Registry的集成均按预期工作.
My topology has input of key-value types String -> Customer
and output of String -> CustomerMapped
. The serdes, schemas and integration with Schema Registry all work as expected.
我正在使用Scala,Kafka 2.2.0,Confluent Platform 5.2.1和 kafka-streams-scala
.我的拓扑尽可能简化,如下所示:
I am using Scala, Kafka 2.2.0, Confluent Platform 5.2.1 and kafka-streams-scala
. My topology, as simplified as possible, looks something like this:
val otherBuilder = new StreamsBuilder()
otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target)
(所有隐式Serdes, Produced
, Consumed
等都是默认值,可以正确找到)
(all implicit serdes, Produced
, Consumed
, etc are the default ones and are found correctly)
我的测试包括同步地,不间断地向 source
主题发送一些记录( data
),并从 target
主题,我将结果与预期
:
My test consists in sending a few records (data
) onto the source
topic in synchronously and without pause, and reading back from the target
topic, I compare the results with expected
:
val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
我构建了Kafka Stream应用程序,并在以下两个设置之间进行设置:
I build the Kafka Stream application, setting between other settings, the following two:
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)
因此,我希望KTable使用缓存,两次提交之间的间隔为5秒,缓存大小为50MB(对于我的方案来说足够了).
So I expect the KTable to use caching, having an interval of 5 seconds between commits and a cache size of 50MB (more than enough for my scenario).
我的问题是我从 target
主题读回的结果始终包含 key1
的多个条目.我本来希望没有事件针对已过时
和`Obsolete1的记录发出.实际输出为:
My problem is that the results I read back from the target
topic always contain multiple entries for key1
. I would have expected no event to be emitted for the records with Obsolete
and `Obsolete1. The actual output is:
Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
最后一件事要提到:直到我将Kafka从2.1.0更新到2.2.0之前,该测试一直按预期进行.我再次验证了此应用程序降级的权限.
One final thing to mention: this test used to work as expected until I updated Kafka from 2.1.0 to 2.2.0. I verified this downgrading my application again.
我很困惑,没有人能指出2.2.x版本中KTables的行为是否有所改变?也许现在我必须设置新的设置来控制事件的发出?
I am quite confused, can anyone point out whether something changed in the behaviour of KTables in the 2.2.x versions? Or maybe there are now new settings I have to set to control the emission of events?
推荐答案
在Kafka 2.2中,引入了优化以减少Kafka Streams的资源占用.如果不需要计算,则不一定要实现 KTable
.这适用于您的情况,因为 mapValues()
可以即时计算.因为 KTable
没有实现,所以没有缓存,因此每个输入记录都会产生一个输出记录.
In Kafka 2.2, an optimization was introduced to reduce the resource footprint of Kafka Streams. A KTable
is not necessarily materialized if it's not required for the computation. This holds for your case, because mapValues()
can be computed on-the-fly. Because the KTable
is not materialized, there is no cache and thus each input record produces one output record.
比较: https://issues.apache.org/jira/browse/KAFKA-6036
如果要强制执行 KTable
实现,则可以将 Materilized.as("someStoreName")
传入 StreamsBuilder#table()
方法.
If you want to enforce KTable
materialization, you can pass in Materilized.as("someStoreName")
into StreamsBuilder#table()
method.
这篇关于KTable应该发出的事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!