Kafka命令行使用者读取但无法通过Java读取 [英] Kafka command-line consumer reads, but cannot read through Java

查看:349
本文介绍了Kafka命令行使用者读取但无法通过Java读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用以下命令手动创建了主题test:

I have manually created topic test with this command:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

并使用以下命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

我插入了以下记录:

This is a message
This is another message
This is a message2

首先,我通过这样的命令行使用消息:

First, I consume messages through the command line like this:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

,并且所有记录均已成功显示.然后,我尝试使用以下代码在Java中实现使用者:

and all the records are successfully shown. Then, I try to implement a consumer in Java using this code:

public class KafkaSubscriber {

    public void consume() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));
        // also with this command
        // consumer.subscribe(Arrays.asList("test"));

        System.out.println("Starting to read data...");

        try {
            while (true) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    System.out.println("Number of records found: " + records.count());
                    for (ConsumerRecord rec : records) {
                        System.out.println(rec.value());
                    }
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
        catch (Exception e) {
                e.printStackTrace();
        } finally {
            consumer.close();
        }
}

但是输出是:

Starting to read data...
0
0
0
0
0
....

这意味着它在主题test中找不到任何记录.我还尝试在Java使用者启动后 发布一些记录,但是还是一样.任何想法可能出什么问题吗?

Which means that it does not find any records in topic test. I also tried to publish some records after the Java consumer has started, but the same again. Any ideas what might be going wrong?

编辑:添加以下行之后:

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

现在,消费者仅在我向该主题写入新记录时读取.它不会从开始读取所有记录.

the consumer now reads only when I write new records to the topic. It does not read all the records from the beggining.

推荐答案

默认情况下,如果以前没有为该组提交任何偏移量,则使用者将从结束主题开始.

By default, if no offsets have previously been committed for the group, the consumer starts at the end topics.

因此,如果您在生成记录后运行它,它将不会收到它们.

Hence if you are running it after having produced records, it won't receive them.

kafka-console-consumer.sh中的通知中,您具有--from-beginning标志,该标志强制消费者改为从主题的开头开始.

Notice in your kafka-console-consumer.sh, you have the --from-beginning flag which forces the consumer to instead start from the beginning of the topic.

如注释中所建议,一种解决方法是将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG设置为earliest.但是,我会谨慎设置,因为您的消费者将从主题开始就开始消费,在实际用例中这可能是很多数据.

One workaround, as suggested in a comment, is to set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. However I'd be careful with that setting as your consumer will consume from the start of the topics and this could be a lot of data in a real use case.

最简单的解决方案是,您已经运行了使用者一次并且创建了一个组,则只需重新运行生产者即可.之后,当您再次运行消费者时,它将从新生产者消息之前的最后位置开始.

The easiest solution is now that you've run your consumer once and it has created a group, you can simply rerun the producer. After that when you run the consumer again it will pick up from its last position which will be before the new producer messages.

另一方面,如果您打算始终重新使用所有消息,则有2个选择:

On the other hand, if you mean to always reconsume all messages then you have 2 options:

  • 当消费者开始将其位置移至主题开头时,明确使用seekToBeginning()

auto.offset.reset设置为earliest并通过将enable.auto.commit设置为false

set auto.offset.reset to earliest and disable auto offset commit by setting enable.auto.commit to false

这篇关于Kafka命令行使用者读取但无法通过Java读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆