如何在Kafka中使用事务以及如何使用abortTransaction? [英] How to use transactions in Kafka and how to use abortTransaction?

查看:340
本文介绍了如何在Kafka中使用事务以及如何使用abortTransaction?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是kafka的新手,我使用Kafka Producer Java api. 面对卡夫卡这个问题, Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION.

Im new to kafka and I use Kafka Producer Java api. Facing this issue with Kafka, Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION.

人们写道,只有在没有交易进行中时才应调用producer.abortTransaction(). 知道如何检查是否有交易进行中吗?以及如何清除/阻止它们?

people have written that producer.abortTransaction() should be called only when there are no transactions in flight.... Any idea how to check whether there are transactions in flight? and how to clear/stop them?

这是我的代码:

try { 
  producer.send(record, new Callback() { 
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
      if ( e != null){ 
        logger.info("Record was not sent due to kafka issue");
        throw new KafkaException("Record was not sent due to kafka issue");
      }
    }
  });
} catch (KafkaException e){
  producer.abortTransaction(); 
}

推荐答案

我需要实现的是检测kafka何时停止,并在这种情况下清除所有缓冲区,以便当再次启动kafka时,这些缓冲区中的记录不会出现在使用者端.

What I need to achieve is detecting when the kafka is stopped and in this case clear all the buffers so the records In these buffers do not appear in the consumer side when the kafka is started again.

在这种情况下,通常要做的是应用

What you usually would do in this case is to apply a transaction which is described in the Java Docs of the KafkaProducer:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();

如果isolation.level设置为read_committed,则消费者可以看到100条记录的全部或全部.

That way either all or none of the 100 records are visible to the consumer if isolation.level is set to read_committed.

您将关闭producer.close()的不可恢复的异常,例如

You are closing producer.close() for non-recoverable Exceptions such as

  • ProducerFencedException :此致命异常表示已经启动了另一个具有相同transactional.id的生产者.在任何给定时间只能有一个带有transactional.id的生产者实例,而最后一个要启动的实例是栅栏".以前的实例,这样它们就不能再发出事务请求.遇到此异常时,必须关闭生产者实例.

  • ProducerFencedException: This fatal exception indicates that another producer with the same transactional.id has been started. It is only possible to have one producer instance with a transactional.id at any given time, and the latest one to be started "fences" the previous instances so that they can no longer make transactional requests. When you encounter this exception, you must close the producer instance.

OutOfOrderSequenceException :此异常表明代理从生产者接收到意外的序列号,这意味着数据可能已丢失.如果生产者仅配置为幂等(即如果设置了enable.idempotence并且未配置transactional.id),则可以继续使用同一生产者实例进行发送,但是这样做可能会重新排序已发送记录的顺序.对于事务性生产者,这是一个致命错误,您应该关闭生产者.

OutOfOrderSequenceException: This exception indicates that the broker received an unexpected sequence number from the producer, which means that data may have been lost. If the producer is configured for idempotence only (i.e. if enable.idempotence is set and no transactional.id is configured), it is possible to continue sending with the same producer instance, but doing so risks reordering of sent records. For transactional producers, this is a fatal error and you should close the producer.

AuthorizationException :[不言自明]

这篇关于如何在Kafka中使用事务以及如何使用abortTransaction?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆