是否可以使用密钥和分区来使用 kafka 消息? [英] Is it possible to consume kafka messages using key and partition?

查看:40
本文介绍了是否可以使用密钥和分区来使用 kafka 消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 kafka_2.12 版本 2.3.0,其中我使用分区和密钥将数据发布到 kafka 主题中.我需要找到一种方法,使用该方法可以使用键和分区组合来使用来自主题的特定消息.这样我就不必消耗所有消息并迭代正确的消息.

I am using kafka_2.12 version 2.3.0 where I am publishing data into kafka topic using partition and key. I need to find a way using which I can consume a particular message from topic using key and partition combination. That way I won't have to consume all the messages and iterate for the correct one.

现在我只能这样做

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
    it -> it.key().equals(key)
}

推荐答案

topic/partitions的消费方式有两种:

There are two ways to consume topic/partitions is:

  1. KafkaConsumer.assign() : 文档链接
  2. KafkaConsumer.subscribe() : 文档链接

因此,您无法通过密钥获取消息.

So, You can't get messages by key.

如果您没有扩展分区的计划,请考虑使用assign() 方法.因为所有带有特定键的消息都会进入同一个分区.

If you don't have a plan to expand partitions, consider using assign() method. Because all the messages that come with the specific key will go to the same partition.

使用方法:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));

while(true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    String data = records.findAll {
        it -> it.key().equals(key)
    }
}

这篇关于是否可以使用密钥和分区来使用 kafka 消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆