无法使用ChainedKafkaTransaction同步Kafka和MQ事务 [英] Unable to synchronise Kafka and MQ transactions usingChainedKafkaTransaction

查看:17
本文介绍了无法使用ChainedKafkaTransaction同步Kafka和MQ事务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个 Spring Boot 应用程序,它使用来自 IBM MQ 的消息进行一些转换并将结果发布到 Kafka 主题.为此,我们使用 https://spring.io/projects/spring-kafka.我知道 Kafka 不支持 XA;然而,在文档中,我发现了一些关于使用 ChainedKafkaTransactionManager 链接多个事务管理器并同步事务的输入.同一文档还提供了一个示例,说明如何在从 Kafka 读取消息并将它们存储在数据库中时如何同步 Kafka 和数据库.

We have a spring boot application which consumes messages from IBM MQ does some transformation and publishes the result to a Kafka topic. We use https://spring.io/projects/spring-kafka for this. I am aware that Kafka does not supports XA; however, in the documentation I found some inputs about using a ChainedKafkaTransactionManager to chain multiple transaction managers and synchronise the transactions. The same documentation also provides an example about how to synchronise Kafka and database while reading messages from Kafka and storing them in the database.

我在我的案例中遵循相同的示例,并将 JmsTransactionManagerKafkaTransactionManager 链接到 ChainedKafkaTransactionManager 的保护伞下.bean 定义如下:

I follow the same example in my se case and chained the JmsTransactionManager with KafkaTransactionManager under the umbrella of a ChainedKafkaTransactionManager. The bean definitions follows below:

@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.connectionFactory());
    factory.setTransactionManager(this.jmsTransactionManager());
    return factory;
}

@Bean
public JmsTransactionManager jmsTransactionManager() {
    return new JmsTransactionManager(this.connectionFactory());
}

@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
        JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {

    return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}

@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Processing the message here then publishing it to Kafka using KafkaTemplate
    kafkaTemplate.send(sourceTopic,transformedMessage);

    // Then throw an exception just to test the transaction behaviour
    throw new RuntimeException("Not good Pal!");
}

在运行应用程序时,发生的情况是他的消息不断回滚到 MQ 队列中,但消息在 Kafka 主题中不断增长,这对我来说意味着 kafkaTemplate 交互不会回滚.

When running the application what is happening is that he message keep getting rollbacked into the MQ Queue but messages keep growing in Kafka topic which means to me that kafkaTemplate interaction does not get rollbacked.

如果我根据文档理解得很好,这不应该是这种情况.如果事务处于活动状态,则在事务范围内执行的任何 KafkaTemplate 操作都使用事务的 Producer."

If I understand well according with the documentation this should not be the case. "If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction use the transaction’s Producer."

在我们的 application.yaml 中,我们通过设置 spring.kafka.producer.transaction-id-prefix

In our application.yaml we configured the Kafka producer to use transactions by setting up spring.kafka.producer.transaction-id-prefix

问题是我在这里缺少什么以及我应该如何解决它.预先感谢您的意见.

The question is what I am missing here and how should I fix it. Thank you in advance for your inputs.

推荐答案

消费者默认可以看到未提交的记录;将 isolation.level 消费者属性设置为 read_committed 以避免从回滚事务中接收记录.

Consumers can see uncommitted records by default; set the isolation.level consumer property to read_committed to avoid receiving records from rolled-back transactions.

这篇关于无法使用ChainedKafkaTransaction同步Kafka和MQ事务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆