新建KTable不返回任何内容 [英] Newly build KTable returns nothing
问题描述
我正在尝试使用KTable消耗来自Kafka主题的事件.但是,它什么也不返回.当我使用KStream时,它返回并打印对象.这真是奇怪. 生产者和消费者可以是在这里找到
I am trying to use KTable to consume events from Kafka topic. But, it returns nothing. When I use KStream, it returns and prints objects. This is really strange. Producer and Consumer can be found here
//Not working
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));
//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))
推荐答案
经过大量研究,我发现了自己的语法问题.根据Confluent/Kafka文档,我使用的语法是有效的,但无法正常工作.将与Kafka团队产生错误.现在,有效的新语法为
After a lot of research, I found the issue in my syntax. The syntax I am using is valid, based on Confluent/Kafka documentation but it's not working. Will raise a bug with Kafka team. Now, new syntax that is working is
KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
.withKeySerde(Serdes.String())
.withValueSerde(customerSerde));
我应该包括withKeySerde()
和withValueSerde()
来使KTable工作.但这并未提及Confluent/Kafka文档
I should include withKeySerde()
and withValueSerde()
to make KTable work. But this is not mentioned Confluent/Kafka documentation
这篇关于新建KTable不返回任何内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!