带有min.insync.replicas的NotEnoughReplicasException的异常行为 [英] Unexpected behaviour of NotEnoughReplicasException with min.insync.replicas

查看:376
本文介绍了带有min.insync.replicas的NotEnoughReplicasException的异常行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我先前的问题

我正在尝试kafka的min.insync.replicas,这是摘要:

  1. 在本地设置3个代理,用min.insync.replicas=2创建一个主题insync.
  2. 消息由 kafka-console-producer acks=all产生,并由 kafka-console-consumer
  3. 阅读
  4. 买断了2个经纪人,仅剩下1个insync.replicas,并期望生产者中的异常为在这里

但是它从未发生过,生产者正在生产消息,而消费者正在从控制台读取消息而没有任何错误.(

这给我带来了更多问题.

  1. 为什么在运行Java生产者而不是console-producer时会发生这种情况??
  2. 为什么异常发生在代理中而不是生产者(java代码)中? min.insync.replicas文档

    如果不能满足此最低要求,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)

在这种情况下,kafka如何保证可靠性?

解决方案

使用acks=all制作到同步副本数少于min.insync.replicas的主题时,制作者应获取NotEnoughReplicas.

之所以没有看到此行为,是因为控制台生产者命令和Java代码都有问题.

1.控制台制作人

要在kafka-console-producer.sh中启用acks=all,您需要指定--request-required-acks标志:

./kafka-console-producer.sh --broker-list localhost:9092 \
    --topic insync --request-required-acks all

这是因为--request-required-acks标志优先于通过--producer.config指定的值,并且默认为1.

2. Java代码

您粘贴的代码应该无法发送任何消息,但是按照当前的逻辑,您应该只会收到WARN日志消息,例如:

出现错误时,主题分区上的响应ID为15,请重试...

要在代码中得到通知,您需要检查send()的结果,方法是检查返回的Future或将Callback作为第二个参数传递.也不是NotEnoughReplicasException是可重试的异常,因此对于最新的客户端,默认情况下,它将永久重试,而不是通知调用代码.

例如:

 Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
    producer.send(record, (RecordMetadata metadata, Exception exc) -> {
        if (exc != null) {
            System.err.println(exc);
        }
    });
}
 

当主题低于最小ISR时,制作人将重试5次,然后再使记录失败.然后它将调用带有异常的lambda,因此您将获得:

org.apache.kafka.common.errors.NotEnoughReplicasException:由于同步副本中的副本数少于所需数量,因此邮件被拒绝.


因此,可以得出结论,min.insync.replicas的处理正确,但是您需要注意将正确的参数传递给该工具,并在Java逻辑中正确处理异常.

This is a continuation of my previous question

I was experimenting kafka's min.insync.replicas and here's the summary:

  1. Setup 3 brokers in local, created a topic insync with min.insync.replicas=2.
  2. Messages were produced by kafka-console-producer with acks=all and read by kafka-console-consumer
  3. Bought down 2 brokers leaving just 1 insync.replicas and was expecting an exception in producer as mentioned here and here

But it never happened and producer was producing messages and consumer was reading them from console without any errors.(more details in previous question)

Then, Instead of producing messages from console-producer , I wrote a java producer with same configurations as console producer and finally got the following exception.

ERROR [Replica Manager on Broker 0]: Error processing append operation on partition insync-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [insync,0] is [ 1 ], below required minimum [ 2 ]

Although I expected it from the producer(java code), it showed up in the kafka broker.

Console producer command

 ./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties

kafka-console-producer properties:

bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all

Java producer code:

public static void main(String[] args) {
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());

        try {

            int count = 0;
            while (true) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
                        "test message: " + count);
                kafkaProducer.send(record);

                Thread.sleep(3000);
                count++;
            }
        } catch (Exception e) {

            e.printStackTrace();
        } finally {
            kafkaProducer.close();
        }
    }

    private static Properties producerConfigs() {
        Properties properties = new Properties();

        properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("acks", "all");

        return properties;
    }

This brings me more questions.

  1. Why does it happen while running java producer and not in console-producer.?
  2. Why does the exception occur in broker and not in producer(java code)? the documentation for min.insync.replicas says

    If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend)

How does kafka guarantee reliability in this case?

解决方案

When producing with acks=all to a topic that has less in-sync replicas than min.insync.replicas, the producer should get NotEnoughReplicas.

The reason why you are not seeing this behavior is because you have issues with both the console producer command and the Java code.

1. Console Producer

To enable acks=all in the kafka-console-producer.sh, you need to specify the --request-required-acks flag:

./kafka-console-producer.sh --broker-list localhost:9092 \
    --topic insync --request-required-acks all

This is because the --request-required-acks flag takes precedence over the values specified via --producer.config and it defaults to 1.

2. Java code

The code you have pasted should be failing to send any messages but with the current logic you should only get WARN log messages like:

Got error produce response with correlation id 15 on topic-partition , retrying ...

To get notified in your code, you need to check the result of send(), via either checking the Future it returns or passing a Callback as the 2nd argument. Not also that NotEnoughReplicasException is a retriable exception so with the latest client, by default, it will retry forever instead of notifying the calling code.

For example:

Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
    producer.send(record, (RecordMetadata metadata, Exception exc) -> {
        if (exc != null) {
            System.err.println(exc);
        }
    });
}

When the topic is below minimun ISR, the producer will retry 5 times before failing the record. It will then call the lambda with the exception, so you'll get:

org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.


So to conclude, min.insync.replicas is handled correctly but you need to be careful to pass the correct arguments to the tool, handle exceptions correctly in Java logic.

这篇关于带有min.insync.replicas的NotEnoughReplicasException的异常行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆