是否可以使用密钥和分区来使用 kafka 消息? [英] Is it possible to consume kafka messages using key and partition?
问题描述
我正在使用 kafka_2.12 版本 2.3.0,其中我使用分区和密钥将数据发布到 kafka 主题中.我需要找到一种方法,使用该方法可以使用键和分区组合来使用来自主题的特定消息.这样我就不必消耗所有消息并迭代正确的消息.
I am using kafka_2.12 version 2.3.0 where I am publishing data into kafka topic using partition and key. I need to find a way using which I can consume a particular message from topic using key and partition combination. That way I won't have to consume all the messages and iterate for the correct one.
现在我只能这样做
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
it -> it.key().equals(key)
}
推荐答案
topic/partitions的消费方式有两种:
There are two ways to consume topic/partitions is:
因此,您无法通过密钥获取消息.
So, You can't get messages by key.
如果您没有扩展分区的计划,请考虑使用assign() 方法.因为所有带有特定键的消息都会进入同一个分区.
If you don't have a plan to expand partitions, consider using assign() method. Because all the messages that come with the specific key will go to the same partition.
使用方法:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
String data = records.findAll {
it -> it.key().equals(key)
}
}
这篇关于是否可以使用密钥和分区来使用 kafka 消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!