卡夫卡模式订阅.新主题未触发重新平衡 [英] Kafka pattern subscription. Rebalancing is not being triggered on new topic

查看:25
本文介绍了卡夫卡模式订阅.新主题未触发重新平衡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据 kafka javadocs 如果我:

  • 订阅模式
  • 创建与模式匹配的主题

应该发生重新平衡,这使消费者可以从该新主题中读取.但这并没有发生.

A rebalance should occur, which makes the consumer read from that new topic. But that's not happening.

如果我停止并启动消费者,它确实会选择新主题.所以我知道新主题与模式匹配.在 https://stackoverflow 中可能有重复这个问题.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics 但那个问题无处可去.

If I stop and start the consumer, it does pick up the new topic. So I know the new topic matches the pattern. There's a possible duplicate of this question in https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics but that question got nowhere.

我看到了 kafka 日志并且没有错误,只是没有触发重新平衡.重新平衡会在消费者加入或死亡时触发,但不会在创建新主题时触发(即使将分区添加到现有主题时也不会,但这是另一个主题).

I'm seeing the kafka logs and there are no errors, it just doesn't trigger a rebalance. The rebalance is triggered when consumers join or die, but not when new topics are created (not even when partitions are added to existing topics, but that's another subject).

我正在使用 kafka 0.10.0.0,以及新消费者 API"的官方 Java 客户端,这意味着代理 GroupCoordinator 而不是胖客户端 + zookeeper.

I'm using kafka 0.10.0.0, and the official Java client for the "New Consumer API", meaning broker GroupCoordinator instead of fat client + zookeeper.

这是示例消费者的代码:

This is the code for the sample consumer:

public class SampleConsumer {
public static void main(String[] args) throws IOException {
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        properties.setProperty("group.id", "my-group");

        System.out.println(properties.get("group.id"));
        consumer = new KafkaConsumer<>(properties);
    }
    Pattern pattern = Pattern.compile("mytopic.+");
    consumer.subscribe(pattern, new SampleRebalanceListener());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("%s %s\n", record.topic(), record.value());
        }
    }
}

}

在生产者中,我向名为 mytopic1、mytopic2 等的主题发送消息.

In the producer, I'm sending messages to topics named mytopic1, mytopic2, etc.

如果不触发再平衡,模式几乎没有用处.

Patterns are pretty much useless if the rebalance is not triggered.

你知道为什么重新平衡没有发生吗?

Do you know why the rebalance is not happening?

推荐答案

文档中提到模式匹配将定期针对检查时存在的主题进行.".事实证明,定期"对应于 metadata.max.age.ms 属性.通过将该属性(在我的代码示例中的consumer.props"内)设置为 5000,我可以看到它每 5 秒检测一次新主题和分区.

The documentation mentions "The pattern matching will be done periodically against topics existing at the time of check.". It turns out the "periodically" corresponds to the metadata.max.age.ms property. By setting that property (inside "consumer.props" in my code sample) to i.e. 5000 I can see it detects new topics and partitions every 5 seconds.

这是设计好的,根据这个 jira 票 https://issues.apache.org/jira/browse/KAFKA-3854:

This is as designed, according to this jira ticket https://issues.apache.org/jira/browse/KAFKA-3854:

关于 JIRA 的最后一条说明,后来创建的与消费者订阅模式匹配的主题不会在创建时分配给消费者,这似乎与设计一致.需要对相同模式重复 subscribe() 来处理这种情况.

The final note on the JIRA stating that a later created topic that matches a consumer's subscription pattern would not be assigned to the consumer upon creation seems to be as designed. A repeat subscribe() to the same pattern would be needed to handle that case.

刷新元数据轮询执行票证中提到的重复订阅()".

The refresh metadata polling does the "repeat subscribe()" mentioned in the ticket.

这令人困惑,来自 Kafka 0.8,那里有基于 Zookeper 手表的真正触发,而不是轮询.对于这种情况,IMO 0.9 更像是降级,而不是及时"重新平衡,这变成了具有开销的高频轮询,或者在对新主题/分区做出反应之前长时间的低频轮询.

This is confusing coming from Kafka 0.8 where there was true triggering based on zookeper watches, instead of polling. IMO 0.9 is more of a downgrade for this scenario, instead of "just in time" rebalancing, this becomes either high frequency polling with overhead, or low frequency polling with long times before it reacts to new topics/partitions.

这篇关于卡夫卡模式订阅.新主题未触发重新平衡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆