KTable应该发出的事件 [英] Events that should be emitted by a KTable

查看:37
本文介绍了KTable应该发出的事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试测试作为最后一个节点具有KTable的拓扑.我的测试使用的是成熟的Kafka集群(通过融合的Docker映像),所以我使用 TopologyTestDriver .

I am trying to test a topology that, as the last node, has a KTable. My test is using a full-blown Kafka Cluster (through confluent's Docker images) so I am not using the TopologyTestDriver.

我的拓扑具有键值类型的输入 String->客户 String->的输出;客户映射.Serdes,模式以及与Schema Registry的集成均按预期工作.

My topology has input of key-value types String -> Customer and output of String -> CustomerMapped. The serdes, schemas and integration with Schema Registry all work as expected.

我正在使用Scala,Kafka 2.2.0,Confluent Platform 5.2.1和 kafka-streams-scala .我的拓扑尽可能简化,如下所示:

I am using Scala, Kafka 2.2.0, Confluent Platform 5.2.1 and kafka-streams-scala. My topology, as simplified as possible, looks something like this:

val otherBuilder = new StreamsBuilder()

otherBuilder
     .table[String,Customer](source)
     .mapValues(c => CustomerMapped(c.surname, c.age))
     .toStream.to(target)   

(所有隐式Serdes, Produced Consumed 等都是默认值,可以正确找到)

(all implicit serdes, Produced, Consumed, etc are the default ones and are found correctly)

我的测试包括同步地,不间断地向 source 主题发送一些记录( data ),并从 target 主题,我将结果与预期:

My test consists in sending a few records (data) onto the source topic in synchronously and without pause, and reading back from the target topic, I compare the results with expected:

val data: Seq[(String, Customer)] = Vector(
   "key1" -> Customer(0, "Obsolete", "To be overridden", 0),
   "key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
   "key1" -> Customer(1, "Billy", "The Man", 32),
   "key2" -> Customer(2, "Tommy", "The Guy", 31),
   "key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
   "key1" -> CustomerMapped("The Man", 32),
   "key2" -> CustomerMapped("The Guy", 31),
   "key3" -> CustomerMapped("The Lady", 40)
)

我构建了Kafka Stream应用程序,并在以下两个设置之间进行设置:

I build the Kafka Stream application, setting between other settings, the following two:

p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)

因此,我希望KTable使用缓存,两次提交之间的间隔为5秒,缓存大小为50MB(对于我的方案来说足够了).

So I expect the KTable to use caching, having an interval of 5 seconds between commits and a cache size of 50MB (more than enough for my scenario).

我的问题是我从 target 主题读回的结果始终包含 key1 的多个条目.我本来希望没有事件针对已过时和`Obsolete1的记录发出.实际输出为:

My problem is that the results I read back from the target topic always contain multiple entries for key1. I would have expected no event to be emitted for the records with Obsolete and `Obsolete1. The actual output is:

Vector(
    "key1" -> CustomerMapped("To be overridden", 0),
    "key1" -> CustomerMapped("To be overridden2", 0),
    "key1" -> CustomerMapped("The Man", 32),
    "key2" -> CustomerMapped("The Guy", 31),
    "key3" -> CustomerMapped("The Lady", 40)
)

最后一件事要提到:直到我将Kafka从2.1.0更新到2.2.0之前,该测试一直按预期进行.我再次验证了此应用程序降级的权限.

One final thing to mention: this test used to work as expected until I updated Kafka from 2.1.0 to 2.2.0. I verified this downgrading my application again.

我很困惑,没有人能指出2.2.x版本中KTables的行为是否有所改变?也许现在我必须设置新的设置来控制事件的发出?

I am quite confused, can anyone point out whether something changed in the behaviour of KTables in the 2.2.x versions? Or maybe there are now new settings I have to set to control the emission of events?

推荐答案

在Kafka 2.2中,引入了优化以减少Kafka Streams的资源占用.如果不需要计算,则不一定要实现 KTable .这适用于您的情况,因为 mapValues()可以即时计算.因为 KTable 没有实现,所以没有缓存,因此每个输入记录都会产生一个输出记录.

In Kafka 2.2, an optimization was introduced to reduce the resource footprint of Kafka Streams. A KTable is not necessarily materialized if it's not required for the computation. This holds for your case, because mapValues() can be computed on-the-fly. Because the KTable is not materialized, there is no cache and thus each input record produces one output record.

比较: https://issues.apache.org/jira/browse/KAFKA-6036

如果要强制执行 KTable 实现,则可以将 Materilized.as("someStoreName")传入 StreamsBuilder#table()方法.

If you want to enforce KTable materialization, you can pass in Materilized.as("someStoreName") into StreamsBuilder#table() method.

这篇关于KTable应该发出的事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆