KAFKA-在producer.send期间获取ProducerFencedException的原因是什么 [英] KAFKA - What is reason for getting ProducerFencedException during producer.send
问题描述
尝试将大约5万条消息加载到KAFKA主题中.在少数运行开始时,但并非总是如此.
Trying to load around 50K messages into KAFKA topic. In the beginning of few runs getting below exception but not all the time.
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]
...
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.
代码块如下:
public void persistUpdatesPostAction(List<Message> messageList ) {
if ((messageList == null) || (messageList.isEmpty())) {
return;
}
logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
try {
producer.beginTransaction();
createKafkaBulkInsert1(producer, messageList, "Topic1");
createKafkaBulkInsert2(producer, messageList, "Topic2");
createKafkaBulkInsert3(producer, messageList, "Topic3");
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
producer.close();
KafkaUtils.removeProducer(Thread.currentThread().getName());
}
}
-----------
static Properties setPropertiesProducer() {
Properties temp = new Properties();
temp.put("bootstrap.servers", "localhost:9092");
temp.put("acks", "all");
temp.put("retries", 1);
temp.put("batch.size", 16384);
temp.put("linger.ms", 5);
temp.put("buffer.memory", 33554432);
temp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return temp;
}
public static Producer<String, String> getProducer(String aThreadId) {
if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
Properties temp = producerProps;
temp.put("transactional.id", aThreadId);
Producer<String, String> producer = new KafkaProducer<String, String>(temp);
producerMap.put(aThreadId, producer);
producer.initTransactions();
return producer;
}
return producerMap.get(aThreadId);
}
public static void removeProducer(String aThreadId) {
logger.createDebug("Removing Thread ID :" + aThreadId);
if (producerMap.get(aThreadId) == null)
return;
producerMap.remove(aThreadId);
}
推荐答案
由以下原因引起:org.apache.kafka.common.errors.ProducerFencedException:生产者 尝试以旧时代进行手术.要么有一个新的生产商 相同的transactionalId,或者生产者的交易已被过期 经纪人.
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
此异常消息不是很有帮助.我相信,要说经纪人不再具有客户发送的交易ID的任何记录,这是尝试.这可能是因为:
This exception message is not very helpful. I believe that it is trying to say that the broker no longer has any record of the transaction-id that is being sent by the client. This can either be because:
- 其他人正在使用相同的transaction-id并已提交.以我的经验,除非您在客户之间共享交易ID,否则这种可能性很小.我们使用
UUID.randomUUID()
确保我们的ID是唯一的. - 交易超时,并被经纪人自动化删除.
- Someone else was using the same transaction-id and committed it already. In my experience, this is less likely unless you are sharing transaction-ids between clients. We ensure that our ids are unique using
UUID.randomUUID()
. - The transaction timed out and was removed by broker automation.
在我们的例子中,我们经常会因事务超时而产生此异常.有2个属性可控制经纪人在中止交易并忘记交易之前将其记住的时间.
In our case, we were hitting transaction timeouts every so often that generated this exception. There are 2 properties that govern how long the broker will remember a transaction before aborting it and forgetting about it.
-
transaction.max.timeout.ms
-一个 broker 属性,用于指定直到中止和忘记事务之前的最大毫秒数.许多Kafka版本的默认值似乎是900000(15分钟). 卡夫卡的文件说:
transaction.max.timeout.ms
-- A broker property that specifies the maximum number of milliseconds until a transaction is aborted and forgotten. Default in many Kafka versions seems to be 900000 (15 minutes). Documentation from Kafka says:
允许的最大事务超时.如果客户请求的交易时间超过了该时间,则经纪人将在InitProducerIdRequest中返回错误.这样可以防止客户的超时时间过长,从而使消费者无法阅读交易中包含的主题.
The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.
transaction.timeout.ms
-一个生产者客户端属性,用于设置创建事务时的超时时间(以毫秒为单位).许多Kafka版本的默认值似乎是60000(1分钟).卡夫卡的文件说:
transaction.timeout.ms
-- A producer client property that sets the timeout in milliseconds when a transaction is created. Default in many Kafka versions seems to be 60000 (1 minute). Documentation from Kafka says:
在主动中止正在进行的交易之前,事务协调器将等待生产者更新事务状态的最长时间(以毫秒为单位).
The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
如果客户端中设置的transaction.timeout.ms
属性超过了代理中的transaction.max.timeout.ms
属性,则生产者将立即引发类似以下异常的事情:
If the transaction.timeout.ms
property set in the client exceeds the transaction.max.timeout.ms
property in the broker, the producer will immediately throw something like the following exception:
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse
The transaction timeout is larger than the maximum value allowed by the broker
(as configured by transaction.max.timeout.ms).
这篇关于KAFKA-在producer.send期间获取ProducerFencedException的原因是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!