kafka使用者可以动态检测添加的主题 [英] kafka consumer to dynamically detect topics added

查看:173
本文介绍了kafka使用者可以动态检测添加的主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用KafkaConsumer来消耗来自Kafka服务器的消息(主题).

  • 它对于在启动消费者代码之前创建的主题非常有用...

但是问题是,如果主题是动态创建的(我的意思是说在消费者代码启动之后),它将无法正常工作,但是API表示它将支持动态主题的创建.这是供您参考的链接. /p>

使用的Kafka版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这是JAVA代码...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注意:我的主题名称与正则表达式匹配. 如果我重新启动使用者,它将开始读取推送到主题的消息...

我们非常感谢您的帮助...

解决方案

apache kafka邮件档案中对此有一个答案.我在下面复制它:

使用者支持配置选项"metadata.max.age.ms" 它基本上控制了获取主题元数据的频率.经过 默认情况下,此值设置得很高(5分钟),这意味着将花费 最多5分钟即可发现与您的常规内容匹配的新主题 表达.您可以将此值设置得较低一些,以更快地发现主题.

因此,在道具中,您可以:

props.put("metadata.max.age.ms", 5000);

这将使您的消费者每5秒钟发现一次新主题.

I'm using KafkaConsumer to consume messages from Kafka server (topics)..

  • It works fine for topics created before starting Consumer code...

But the problem is, it will not work if the topics created dynamically(i mean to say after consumer code started), but the API says it will support dynamic topic creation.. Here is the link for your reference..

Kafka version used : 0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Here is the JAVA code...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

NOTE: My topic names are matching the Regular Expression.. And if i restart the consumer then it will start reading messages pushed to topic...

Any help is really appreciated...

解决方案

There was an answer to this in the apache kafka mail archives. I am copying it below:

The consumer supports a configuration option "metadata.max.age.ms" which basically controls how often topic metadata is fetched. By default, this is set fairly high (5 minutes), which means it will take up to 5 minutes to discover new topics matching your regular expression. You can set this lower to discover topics quicker.

So in your props you can:

props.put("metadata.max.age.ms", 5000);

This will cause your consumer to find out about new topics every 5 seconds.

这篇关于kafka使用者可以动态检测添加的主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆