卡夫卡模式订阅。新主题没有触发重新平衡 [英] Kafka pattern subscription. Rebalancing is not being triggered on new topic

查看:118
本文介绍了卡夫卡模式订阅。新主题没有触发重新平衡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据 kafka javadocs 如果我:


  • 订阅模式

  • 创建与模式匹配的主题

重新平衡应该发生,这使消费者从该新主题中读取。但这种情况并没有发生。

A rebalance should occur, which makes the consumer read from that new topic. But that's not happening.

如果我停止并启动消费者,它确实会接受新主题。所以我知道新主题与模式匹配。在 https:// stackoverflow中可能存在此问题的重复内容。 com / questions / 37120537 / whitelist-filter-in-kafka-doesnt-pick-up-new-topics 但这个问题无处可去。

If I stop and start the consumer, it does pick up the new topic. So I know the new topic matches the pattern. There's a possible duplicate of this question in https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics but that question got nowhere.

我'我看到kafka日志并没有错误,它只是不会触发重新平衡。当消费者加入或死亡时触发重新平衡,但是在创建新主题时不会触发(即使分区被添加到现有主题,但这是另一个主题)。

I'm seeing the kafka logs and there are no errors, it just doesn't trigger a rebalance. The rebalance is triggered when consumers join or die, but not when new topics are created (not even when partitions are added to existing topics, but that's another subject).

I我正在使用kafka 0.10.0.0和新消费者API的官方Java客户端,意思是代理GroupCoordinator而不是胖客户端+ zookeeper。

I'm using kafka 0.10.0.0, and the official Java client for the "New Consumer API", meaning broker GroupCoordinator instead of fat client + zookeeper.

这是代码对于示例使用者:

This is the code for the sample consumer:

public class SampleConsumer {
public static void main(String[] args) throws IOException {
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        properties.setProperty("group.id", "my-group");

        System.out.println(properties.get("group.id"));
        consumer = new KafkaConsumer<>(properties);
    }
    Pattern pattern = Pattern.compile("mytopic.+");
    consumer.subscribe(pattern, new SampleRebalanceListener());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("%s %s\n", record.topic(), record.value());
        }
    }
}

}

在制作人中,我正在向名为mytopic1,mytopic2等的主题发送消息。

In the producer, I'm sending messages to topics named mytopic1, mytopic2, etc.

如果模式几乎没用不会触发重新平衡。

Patterns are pretty much useless if the rebalance is not triggered.

你知道为什么没有发生重新平衡吗?

Do you know why the rebalance is not happening?

推荐答案

文档提到模式匹配将定期针对检查时存在的主题进行。事实证明,周期性对应于metadata.max.age.ms属性。通过将该属性(在我的代码示例中的consumer.props内)设置为5000,我可以看到它每5秒检测一次新主题和分区。

The documentation mentions "The pattern matching will be done periodically against topics existing at the time of check.". It turns out the "periodically" corresponds to the metadata.max.age.ms property. By setting that property (inside "consumer.props" in my code sample) to i.e. 5000 I can see it detects new topics and partitions every 5 seconds.

这是根据这张jira票据 https://issues.apache.org/jira/browse/ KAFKA-3854

This is as designed, according to this jira ticket https://issues.apache.org/jira/browse/KAFKA-3854:


关于JIRA的最后一点说明,后来创建的主题与消费者的订阅模式相匹配在创作时被分配给消费者似乎是按照设计的。处理该情况需要对相同模式重复subscribe()。

The final note on the JIRA stating that a later created topic that matches a consumer's subscription pattern would not be assigned to the consumer upon creation seems to be as designed. A repeat subscribe() to the same pattern would be needed to handle that case.

刷新元数据轮询执行故障单中提到的重复订阅()。

The refresh metadata polling does the "repeat subscribe()" mentioned in the ticket.

这是令人困惑的来自Kafka 0.8,其中真正的触发基于zookeper手表,而不是轮询。对于这种情况,IMO 0.9更像是降级,而不是及时重新平衡,这会成为带有开销的高频率轮询,或者是在对新主题/分区作出反应之前长时间进行低频率轮询。

This is confusing coming from Kafka 0.8 where there was true triggering based on zookeper watches, instead of polling. IMO 0.9 is more of a downgrade for this scenario, instead of "just in time" rebalancing, this becomes either high frequency polling with overhead, or low frequency polling with long times before it reacts to new topics/partitions.

这篇关于卡夫卡模式订阅。新主题没有触发重新平衡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆