是否可以使用密钥和分区来使用kafka消息? [英] Is it possible to consume kafka messages using key and partition?
问题描述
我正在使用 kafka_2.12 版本 2.3.0 ,其中我使用分区和密钥将数据发布到kafka主题中.我需要找到一种方法,可以使用键和分区组合使用主题中的特定消息.这样,我就不必消耗所有消息并为正确的消息进行迭代.
I am using kafka_2.12 version 2.3.0 where I am publishing data into kafka topic using partition and key. I need to find a way using which I can consume a particular message from topic using key and partition combination. That way I won't have to consume all the messages and iterate for the correct one.
现在我只能这样做
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
it -> it.key().equals(key)
}
推荐答案
有两种使用主题/分区的方法:
There are two ways to consume topic/partitions is:
- KafkaConsumer.assign():文档链接
因此,您无法通过按键获取消息.
So, You can't get messages by key.
如果您没有扩展分区的计划,请考虑使用assign()方法.因为带有特定密钥的所有消息都将进入同一分区.
If you don't have a plan to expand partitions, consider using assign() method. Because all the messages that come with the specific key will go to the same partition.
使用方法:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
String data = records.findAll {
it -> it.key().equals(key)
}
}
这篇关于是否可以使用密钥和分区来使用kafka消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!