新构建的 KTable 不返回任何内容 [英] Newly build KTable returns nothing

查看:25
本文介绍了新构建的 KTable 不返回任何内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 KTable 来使用来自 Kafka 主题的事件.但是,它什么都不返回.当我使用 KStream 时,它返回并打印对象.这真的很奇怪.生产者和消费者可以在这里找到

I am trying to use KTable to consume events from Kafka topic. But, it returns nothing. When I use KStream, it returns and prints objects. This is really strange. Producer and Consumer can be found here

//Not working    
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));

//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))

推荐答案

经过大量研究,我发现了我的语法问题.我使用的语法是有效的,基于 Confluent/Kafka 文档,但它不起作用.将向 Kafka 团队提出错误.现在,有效的新语法是

After a lot of research, I found the issue in my syntax. The syntax I am using is valid, based on Confluent/Kafka documentation but it's not working. Will raise a bug with Kafka team. Now, new syntax that is working is

KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
                                                            .withKeySerde(Serdes.String())
                                                            .withValueSerde(customerSerde));

我应该包括 withKeySerde()withValueSerde() 以使 KTable 工作.但这并没有提到 Confluent/Kafka 文档

I should include withKeySerde() and withValueSerde() to make KTable work. But this is not mentioned Confluent/Kafka documentation

这篇关于新构建的 KTable 不返回任何内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆