Kafka 消费者异常和偏移提交 [英] Kafka consumer exception and offset commits

查看:43
本文介绍了Kafka 消费者异常和偏移提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试为 Spring Kafka 做一些 POC 工作.具体来说,我想尝试在处理 Kafka 中的消息时处理错误的最佳实践是什么.

I've been trying to do some POC work for Spring Kafka. Specifically, I wanted to experiment with what are the best practices in terms of dealing with errors while consuming messages within Kafka.

我想知道是否有人能够提供帮助:

I am wondering if anyone is able to help with:

  1. 分享围绕 Kafka 消费者应该做什么的最佳实践出现故障时
  2. 帮助我了解 AckMode Record 是如何工作的,以及如何在 listener 方法中抛出异常时防止提交到 Kafka 偏移队列.

2 的代码示例如下:

鉴于 AckMode 设置为 RECORD,根据 文档:

Given that AckMode is set to RECORD, which according to the documentation:

在处理完监听器返回时提交偏移量记录.

commit the offset when the listener returns after processing the record.

如果监听器方法抛出异常,我会认为偏移量不会增加.但是,当我使用下面的代码/配置/命令组合对其进行测试时,情况并非如此.偏移量仍然得到更新,下一条消息继续处理.

I would have thought the the offset would not be incremented if the listener method threw an exception. However, this was not the case when I tested it using the code/config/command combination below. The offset still gets updated, and the next message continues to be processed.

我的配置:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}

我的代码:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
    public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
            throw new RuntimeException("Oops!");
    }

验证偏移的命令:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

我正在使用 kafka_2.12-0.10.2.0 和 org.springframework.kafka:spring-kafka:1.1.3.RELEASE

I'm using kafka_2.12-0.10.2.0 and org.springframework.kafka:spring-kafka:1.1.3.RELEASE

推荐答案

容器(通过 ContainerProperties)有一个属性,ackOnError,默认情况下为 true...

The container (via ContainerProperties) has a property, ackOnError which is true by default...

/**
 * Set whether or not the container should commit offsets (ack messages) where the
 * listener throws exceptions. This works in conjunction with {@link #ackMode} and is
 * effective only when the kafka property {@code enable.auto.commit} is {@code false};
 * it is not applicable to manual ack modes. When this property is set to {@code true}
 * (the default), all messages handled will have their offset committed. When set to
 * {@code false}, offsets will be committed only for successfully handled messages.
 * Manual acks will be always be applied. Bear in mind that, if the next message is
 * successfully handled, its offset will be committed, effectively committing the
 * offset of the failed message anyway, so this option has limited applicability.
 * Perhaps useful for a component that starts throwing exceptions consistently;
 * allowing it to resume when restarted from the last successfully processed message.
 * @param ackOnError whether the container should acknowledge messages that throw
 * exceptions.
 */
public void setAckOnError(boolean ackOnError) {
    this.ackOnError = ackOnError;
}

但是请记住,如果下一条消息成功,则无论如何都会提交其偏移量,这也有效地提交了失败的偏移量.

Bear in mind, though, that if the next message is successful, its offset will be committed anyway, which effectively commits the failed offset too.

编辑

从 2.3 版开始,ackOnError 现在默认为 false.

Starting with version 2.3, ackOnError is now false by default.

这篇关于Kafka 消费者异常和偏移提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆