带有min.insync.replicas的NotEnoughReplicasException的异常行为 [英] Unexpected behaviour of NotEnoughReplicasException with min.insync.replicas
问题描述
这是我先前的问题
我正在尝试kafka的min.insync.replicas
,这是摘要:
- 在本地设置3个代理,用
min.insync.replicas=2
创建一个主题insync
. - 消息由 kafka-console-producer 和
acks=all
产生,并由 kafka-console-consumer 阅读
- 买断了2个经纪人,仅剩下1个
insync.replicas
,并期望生产者中的异常为在这里
但是它从未发生过,生产者正在生产消息,而消费者正在从控制台读取消息而没有任何错误.( 这给我带来了更多问题.
min.insync.replicas
的文档说
如果不能满足此最低要求,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)
在这种情况下,kafka如何保证可靠性?
使用acks=all
制作到同步副本数少于min.insync.replicas
的主题时,制作者应获取NotEnoughReplicas
.
之所以没有看到此行为,是因为控制台生产者命令和Java代码都有问题.
1.控制台制作人
要在kafka-console-producer.sh
中启用acks=all
,您需要指定--request-required-acks
标志:
./kafka-console-producer.sh --broker-list localhost:9092 \
--topic insync --request-required-acks all
这是因为--request-required-acks
标志优先于通过--producer.config
指定的值,并且默认为1
.
2. Java代码
您粘贴的代码应该无法发送任何消息,但是按照当前的逻辑,您应该只会收到WARN
日志消息,例如:
出现错误时,主题分区上的响应ID为15,请重试...
要在代码中得到通知,您需要检查send()
的结果,方法是检查返回的Future
或将Callback
作为第二个参数传递.也不是NotEnoughReplicasException
是可重试的异常,因此对于最新的客户端,默认情况下,它将永久重试,而不是通知调用代码.
例如:
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
producer.send(record, (RecordMetadata metadata, Exception exc) -> {
if (exc != null) {
System.err.println(exc);
}
});
}
当主题低于最小ISR时,制作人将重试5次,然后再使记录失败.然后它将调用带有异常的lambda,因此您将获得:
org.apache.kafka.common.errors.NotEnoughReplicasException:由于同步副本中的副本数少于所需数量,因此邮件被拒绝.
因此,可以得出结论,min.insync.replicas
的处理正确,但是您需要注意将正确的参数传递给该工具,并在Java逻辑中正确处理异常.
This is a continuation of my previous question
I was experimenting kafka's min.insync.replicas
and here's the summary:
- Setup 3 brokers in local, created a topic
insync
withmin.insync.replicas=2
. - Messages were produced by kafka-console-producer with
acks=all
and read by kafka-console-consumer - Bought down 2 brokers leaving just 1
insync.replicas
and was expecting an exception in producer as mentioned here and here
But it never happened and producer was producing messages and consumer was reading them from console without any errors.(more details in previous question)
Then, Instead of producing messages from console-producer , I wrote a java producer with same configurations as console producer and finally got the following exception.
ERROR [Replica Manager on Broker 0]: Error processing append operation on partition insync-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [insync,0] is [ 1 ], below required minimum [ 2 ]
Although I expected it from the producer(java code), it showed up in the kafka broker.
Console producer command
./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties
kafka-console-producer properties:
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all
Java producer code:
public static void main(String[] args) {
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());
try {
int count = 0;
while (true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
"test message: " + count);
kafkaProducer.send(record);
Thread.sleep(3000);
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
}
}
private static Properties producerConfigs() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "all");
return properties;
}
This brings me more questions.
- Why does it happen while running java producer and not in console-producer.?
- Why does the exception occur in broker and not in producer(java code)?
the documentation for
min.insync.replicas
saysIf this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend)
How does kafka guarantee reliability in this case?
When producing with acks=all
to a topic that has less in-sync replicas than min.insync.replicas
, the producer should get NotEnoughReplicas
.
The reason why you are not seeing this behavior is because you have issues with both the console producer command and the Java code.
1. Console Producer
To enable acks=all
in the kafka-console-producer.sh
, you need to specify the --request-required-acks
flag:
./kafka-console-producer.sh --broker-list localhost:9092 \
--topic insync --request-required-acks all
This is because the --request-required-acks
flag takes precedence over the values specified via --producer.config
and it defaults to 1
.
2. Java code
The code you have pasted should be failing to send any messages but with the current logic you should only get WARN
log messages like:
Got error produce response with correlation id 15 on topic-partition , retrying ...
To get notified in your code, you need to check the result of send()
, via either checking the Future
it returns or passing a Callback
as the 2nd argument. Not also that NotEnoughReplicasException
is a retriable exception so with the latest client, by default, it will retry forever instead of notifying the calling code.
For example:
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
producer.send(record, (RecordMetadata metadata, Exception exc) -> {
if (exc != null) {
System.err.println(exc);
}
});
}
When the topic is below minimun ISR, the producer will retry 5 times before failing the record. It will then call the lambda with the exception, so you'll get:
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
So to conclude, min.insync.replicas
is handled correctly but you need to be careful to pass the correct arguments to the tool, handle exceptions correctly in Java logic.
这篇关于带有min.insync.replicas的NotEnoughReplicasException的异常行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!