如何在Kafka中使用事务以及如何使用abortTransaction? [英] How to use transactions in Kafka and how to use abortTransaction?

查看:27
本文介绍了如何在Kafka中使用事务以及如何使用abortTransaction?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 kafka 的新手,我使用 Kafka Producer Java api.面对卡夫卡的这个问题,Kafka:尝试从状态 COMMITTING_TRANSACTION 到状态 ABORTING_TRANSACTION 的转换无效.

Im new to kafka and I use Kafka Producer Java api. Facing this issue with Kafka, Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION.

人们已经写到 producer.abortTransaction() 应该只在没有交易正在进行时才被调用......知道如何检查是否有交易进行中吗?以及如何清除/阻止它们?

people have written that producer.abortTransaction() should be called only when there are no transactions in flight.... Any idea how to check whether there are transactions in flight? and how to clear/stop them?

这是我的代码:

try { 
  producer.send(record, new Callback() { 
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
      if ( e != null){ 
        logger.info("Record was not sent due to kafka issue");
        throw new KafkaException("Record was not sent due to kafka issue");
      }
    }
  });
} catch (KafkaException e){
  producer.abortTransaction(); 
}

推荐答案

我需要实现的是检测 kafka 何时停止并在这种情况下清除所有缓冲区,以便在 kafka 再次启动时这些缓冲区中的记录不会出现在消费者端.

What I need to achieve is detecting when the kafka is stopped and in this case clear all the buffers so the records In these buffers do not appear in the consumer side when the kafka is started again.

在这种情况下,您通常会做的是应用 KafkaProducer:

What you usually would do in this case is to apply a transaction which is described in the Java Docs of the KafkaProducer:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();

这样,如果 isolation.level 设置为 read_committed,那么 100 条记录中的所有记录或不可见.

That way either all or none of the 100 records are visible to the consumer if isolation.level is set to read_committed.

您正在关闭 producer.close() 对于不可恢复的异常,例如

You are closing producer.close() for non-recoverable Exceptions such as

  • ProducerFencedException:此致命异常表明另一个具有相同 transactional.id 的生产者已启动.在任何给定时间只能有一个带有 transactional.id 的生产者实例,而最近的一个被启动围栏".以前的实例,以便它们不能再发出事务请求.遇到此异常时,必须关闭生产者实例.

  • ProducerFencedException: This fatal exception indicates that another producer with the same transactional.id has been started. It is only possible to have one producer instance with a transactional.id at any given time, and the latest one to be started "fences" the previous instances so that they can no longer make transactional requests. When you encounter this exception, you must close the producer instance.

OutOfOrderSequenceException:此异常表示代理收到了来自生产者的意外序列号,这意味着数据可能已丢失.如果生产者仅配置为幂等性(即如果设置了 enable.idempotence 并且没有配置 transactional.id),则可以使用相同的生产者实例继续发送,但这样做有可能对发送的记录进行重新排序.对于事务性生产者,这是一个致命错误,您应该关闭生产者.

OutOfOrderSequenceException: This exception indicates that the broker received an unexpected sequence number from the producer, which means that data may have been lost. If the producer is configured for idempotence only (i.e. if enable.idempotence is set and no transactional.id is configured), it is possible to continue sending with the same producer instance, but doing so risks reordering of sent records. For transactional producers, this is a fatal error and you should close the producer.

AuthorizationException:[不言自明]

这篇关于如何在Kafka中使用事务以及如何使用abortTransaction?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆