在第一个消费者连接之前产生的消息丢失 [英] Messages produced before first consumer connected lost

查看:24
本文介绍了在第一个消费者连接之前产生的消息丢失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 kafka-topic.sh 在 kafka 中创建了一个主题,并使用 java 客户端对其进行了测试:

I've created a topic in kafka using kafka-topic.sh and tested it with java client:

kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 2 \
--topic my-topic

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"), new LoggingConsumerRebalanceListener(RandomStringUtils.randomAlphanumeric(3).toLowerCase()));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    Thread.sleep(500);
}

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
  String key = Integer.toString(i+1);
  String value = RandomStringUtils.randomAlphabetic(100);
  LOGGER.info("Sending message {}", key);
    producer.send(new ProducerRecord<String, String>("my-topic", key, value));
    Thread.sleep(100);
}
producer.close();    

生产者和消费者是我独立启动的独立代码块.

Producer and consumer are the separate blocks of code that I start independently.

我有观察者,以下代码在序列中正常工作:

I have observer, that the following code works properly in the sequence:

  • 设置主题
  • 运行消费者
  • 运行制作人
  • 运行生产者...

但是,按顺序:

  • 设置主题
  • 运行生产者 (1)
  • 运行消费者
  • 运行制作人

生产者第一次运行的消息丢失了.后来,如果我停止消费者,运行生产者并运行消费者,我会收到所有消息.只有在第一个消费者订阅之前产生的消息才会丢失.虽然我已经在命令行中明确创建了主题.

The messages from the first run of the producer are lost. Later, if I stop consumer, run producer and run consumer, I'm getting all messages. Only the messages produced before the first consumer has subscriben are lost. Although I've explicitely created the topic in the command line.

我在这里做错了什么?如何防止邮件丢失?

What I'm doing wrong here? How to prevent messages getting lost?

推荐答案

默认情况下,消费者会从最新的偏移量开始读取.

By default, the consumer will read from the latest offset.

如果您运行生产者 (1)"然后启动消费者,它将忽略来自该生产者的消息并等待第二个生产者的调用产生的新消息.

If you run the "producer (1)" and after that start the consumer, it will ignore the messages from that producer and wait for new messages produced by the call of the second producer.

可以通过配置 auto.offset.reset 更改从最新偏移量读取的行为.

The behaviour to read from latest offset can be changed through the configuration auto.offset.reset.

稍后,如果我停止消费者,运行生产者并运行消费者,我会收到所有消息

Later, if I stop consumer, run producer and run consumer, I'm getting all messages

发生这种情况是因为您的消费者有一个固定的消费者组(配置 group.id)并且默认设置 auto.offset.reset 不再有任何影响,因为该组已在 Kafka 中注册并且消费者将继续从主题中读取它停止的地方.

This happens because your consumer has a fixed ConsumerGroup (configuration group.id) and the default setting auto.offset.reset does not have any impact anymore as this Group is registered with Kafka and the consumer will continue to read from the topic where it left off.

总而言之,如果您想在运行第二个序列时不遗漏任何消息,请设置 auto.offset.reset=earliest 并定义一个新的唯一 group.id.

To conclude, if you want to not miss any messages when running your second sequence, set auto.offset.reset=earliest and define a new unique group.id.

这篇关于在第一个消费者连接之前产生的消息丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆