在producer.send期间获取ProducerFencedException的原因是什么? [英] What is reason for getting ProducerFencedException during producer.send?

查看:140
本文介绍了在producer.send期间获取ProducerFencedException的原因是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试将大约5万条消息加载到KAFKA主题中.在少数运行开始时,但并非总是如此.

Trying to load around 50K messages into KAFKA topic. In the beginning of few runs getting below exception but not all the time.

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state  
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]  
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]  
at  org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]  
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]  
...  
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.  

代码块如下:

public void persistUpdatesPostAction(List<Message> messageList ) {
    if ((messageList == null) || (messageList.isEmpty())) {
        return;
    }
    logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
    Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
    try {
        producer.beginTransaction();
        createKafkaBulkInsert1(producer, messageList, "Topic1");
        createKafkaBulkInsert2(producer, messageList, "Topic2");
        createKafkaBulkInsert3(producer, messageList, "Topic3");
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
        producer.close();
        KafkaUtils.removeProducer(Thread.currentThread().getName());
    }
}

-----------

static Properties setPropertiesProducer() {
    Properties temp = new Properties();
    temp.put("bootstrap.servers", "localhost:9092");
    temp.put("acks", "all");
    temp.put("retries", 1);
    temp.put("batch.size", 16384);
    temp.put("linger.ms", 5);
    temp.put("buffer.memory", 33554432);
    temp.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
    temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return temp;
}

public static Producer<String, String> getProducer(String aThreadId) {
    if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
        Properties temp = producerProps;
        temp.put("transactional.id", aThreadId);
        Producer<String, String> producer = new KafkaProducer<String, String>(temp);
        producerMap.put(aThreadId, producer);
        producer.initTransactions();
        return producer;
    }
    return producerMap.get(aThreadId);
}

public static void removeProducer(String aThreadId) {
    logger.createDebug("Removing Thread ID :" + aThreadId);
    if (producerMap.get(aThreadId) == null)
        return;
    producerMap.remove(aThreadId);
}

推荐答案

由以下原因引起:org.apache.kafka.common.errors.ProducerFencedException:生产者尝试以旧时代进行手术.要么有一个新的生产商相同的transactionalId,或者生产者的交易已被过期经纪人.

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

此异常消息不是很有帮助.我相信,要说经纪人不再具有客户发送的交易ID的任何记录,这是尝试.这可能是因为:

This exception message is not very helpful. I believe that it is trying to say that the broker no longer has any record of the transaction-id that is being sent by the client. This can either be because:

  • 其他人正在使用相同的transaction-id并已提交.以我的经验,除非您在客户之间共享交易ID,否则这种可能性很小.我们使用 UUID.randomUUID()确保我们的ID是唯一的.
  • 交易超时,并被经纪人自动化删除.
  • Someone else was using the same transaction-id and committed it already. In my experience, this is less likely unless you are sharing transaction-ids between clients. We ensure that our ids are unique using UUID.randomUUID().
  • The transaction timed out and was removed by broker automation.

在我们的例子中,我们经常会因事务超时而产生此异常.有2个属性可控制经纪人在中止交易并将其遗忘之前记住交易的时间.

In our case, we were hitting transaction timeouts every so often that generated this exception. There are 2 properties that govern how long the broker will remember a transaction before aborting it and forgetting about it.

  • transaction.max.timeout.ms –一个 broker 属性,用于指定直到中止和忘记事务之前的最大毫秒数.许多Kafka版本的默认值似乎是900000(15分钟).卡夫卡的文件说:

  • transaction.max.timeout.ms -- A broker property that specifies the maximum number of milliseconds until a transaction is aborted and forgotten. Default in many Kafka versions seems to be 900000 (15 minutes). Documentation from Kafka says:

允许的最大交易超时.如果客户请求的交易时间超过了该时间,则经纪人将在InitProducerIdRequest中返回错误.这样可以防止客户的超时时间过长,从而使消费者无法阅读交易中包含的主题.

The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

  • transaction.timeout.ms -一个生产者客户端属性,用于设置创建事务时的超时时间(以毫秒为单位).许多Kafka版本的默认值似乎是60000(1分钟).卡夫卡的文件说:

  • transaction.timeout.ms -- A producer client property that sets the timeout in milliseconds when a transaction is created. Default in many Kafka versions seems to be 60000 (1 minute). Documentation from Kafka says:

    在主动中止正在进行的事务之前,事务协调器将等待生产者更新事务状态的最长时间(以毫秒为单位).

    The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

  • 如果客户端中设置的 transaction.timeout.ms 属性超过了代理中的 transaction.max.timeout.ms 属性,则生产者将立即抛出类似以下异常:

    If the transaction.timeout.ms property set in the client exceeds the transaction.max.timeout.ms property in the broker, the producer will immediately throw something like the following exception:

    org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse
    The transaction timeout is larger than the maximum value allowed by the broker 
    (as configured by transaction.max.timeout.ms).
    

    这篇关于在producer.send期间获取ProducerFencedException的原因是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆