是否可以使用密钥和分区来使用kafka消息? [英] Is it possible to consume kafka messages using key and partition?

查看:56
本文介绍了是否可以使用密钥和分区来使用kafka消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 kafka_2.12 版本 2.3.0 ,其中我使用分区和密钥将数据发布到kafka主题中.我需要找到一种方法,可以使用键和分区组合使用主题中的特定消息.这样,我就不必消耗所有消息并为正确的消息进行迭代.

I am using kafka_2.12 version 2.3.0 where I am publishing data into kafka topic using partition and key. I need to find a way using which I can consume a particular message from topic using key and partition combination. That way I won't have to consume all the messages and iterate for the correct one.

现在我只能这样做

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
    it -> it.key().equals(key)
}

推荐答案

有两种使用主题/分区的方法:

There are two ways to consume topic/partitions is:

  1. KafkaConsumer.assign():因此,您无法通过按键获取消息.

    So, You can't get messages by key.

    如果您没有扩展分区的计划,请考虑使用assign()方法.因为带有特定密钥的所有消息都将进入同一分区.

    If you don't have a plan to expand partitions, consider using assign() method. Because all the messages that come with the specific key will go to the same partition.

    使用方法:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    TopicPartition partition = new TopicPartition("some-topic", 0);
    consumer.assign(Arrays.asList(partition));
    
    while(true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        String data = records.findAll {
            it -> it.key().equals(key)
        }
    }
    

    这篇关于是否可以使用密钥和分区来使用kafka消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆