如何在 Kafka 中使用多个消费者? [英] How do I use multiple consumers in Kafka?

查看:110
本文介绍了如何在 Kafka 中使用多个消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是一名学习 Kafka 的新生,我在理解多个消费者方面遇到了一些基本问题,到目前为止,文章、文档等都没有太大帮助.

I am a new student studying Kafka and I've run into some fundamental issues with understanding multiple consumers that articles, documentations, etc. have not been too helpful with so far.

我尝试做的一件事是编写我自己的高级 Kafka 生产者和消费者并同时运行它们,将 100 条简单消息发布到一个主题并让我的消费者检索它们.我已经成功地做到了这一点,但是当我尝试引入第二个消费者以从刚刚发布消息的同一主题中消费时,它没有收到任何消息.

One thing I have tried to do is write my own high level Kafka producer and consumer and run them simultaneously, publishing 100 simple messages to a topic and having my consumer retrieve them. I have managed to do this successfully, but when I try to introduce a second consumer to consume from the same topic that messages were just published to, it receives no messages.

我的理解是,对于每个主题,您可以拥有来自不同消费者组的消费者,并且这些消费者组中的每一个都将获得针对某个主题生成的消息的完整副本.这样对吗?如果没有,我设置多个消费者的正确方法是什么?这是我目前编写的消费者类:

It was my understanding that for each topic, you could have consumers from separate consumer groups and each of these consumer groups would get a full copy of the messages produced to some topic. Is this correct? If not, what would be the proper way for me to set up multiple consumers? This is the consumer class that I have written so far:

public class AlternateConsumer extends Thread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final Boolean isAsync = false;

    public AlternateConsumer(String topic, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", consumerGroup);
        properties.put("partition.assignment.strategy", "roundrobin");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<Integer, String>(properties);
        consumer.subscribe(topic);
        this.topic = topic;
    }


    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(0);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
            }
        }

    }
}

此外,我注意到最初我是在测试上述消耗的主题测试",只有一个分区.当我将另一个消费者添加到现有的消费者组(例如testGroup")时,这触发了 Kafka 重新平衡,从而显着降低了我的消费延迟,以秒为单位.我认为这是重新平衡的问题,因为我只有一个分区,但是当我创建一个包含 6 个分区的新主题multiplepartitions"时,出现了类似的问题,即向同一消费者组添加更多消费者会导致延迟问题.我环顾四周,人们告诉我我应该使用多线程消费者——有人能解释一下吗?

Furthermore, I noticed that originally I was testing the above consumption for a topic 'test' with only a single partition. When I added another consumer to an existing consumer group say 'testGroup', this trigged a Kafka rebalance which slowed down the latency of my consumption by a significant amount, in the magnitude of seconds. I thought that this was an issue with rebalancing since I only had a single partition, but when I created a new topic 'multiplepartitions' with say 6 partitions, similar issues arose where adding more consumers to the same consumer group caused latency issues. I have looked around and people are telling me I should be using a multi-threaded consumer -- can anyone shed light on that?

推荐答案

我认为您的问题在于 auto.offset.reset 属性.当一个新的消费者从一个分区中读取并且没有先前提交的偏移量时, auto.offset.reset 属性用于决定起始偏移量应该是多少.如果您将其设置为最大"(默认值),您将从最新(最后)消息开始阅读.如果您将其设置为最小",您将获得第一条可用消息.

I think your problem lies with the auto.offset.reset property. When a new consumer reads from a partition and there's no previous committed offset, the auto.offset.reset property is used to decide what the starting offset should be. If you set it to "largest" (the default) you start reading at the latest (last) message. If you set it to "smallest" you get the first available message.

所以添加:

properties.put("auto.offset.reset", "smallest");

然后再试一次.

* 编辑 *

smallest"和largest"不久前被弃用了.您现在应该使用最早"或最新".任何问题,请查看文档

"smallest" and "largest" were deprecated a while back. You should use "earliest" or "latest" now. Any questions, check the docs

这篇关于如何在 Kafka 中使用多个消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆