Spring Kafka 中的事务同步 [英] Transaction Synchronization in Spring Kafka

查看:45
本文介绍了Spring Kafka 中的事务同步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将 kafka 事务与存储库事务同步:

I want to synchronize a kafka transaction with a repository transaction:

@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}

自合并(https://github.com/spring-projects/spring-kafka/issues/373) 并且根据文档这是可能的.尽管如此,我在理解和实现该功能方面仍有问题.查看 https://docs.spring.io/中的示例spring-kafka/reference/html/#transaction-synchronization 我必须创建一个 MessageListenerContainer 来监听我自己的事件.我是否仍然需要使用 KafkaTemplate 发送我的事件?MessageListenerContainer 是否禁止发送给代理?

Since the merge (https://github.com/spring-projects/spring-kafka/issues/373) and according to the doc this is possible. Nevertheless i have problems to understand and implement that feature. Looking at the example in https://docs.spring.io/spring-kafka/reference/html/#transaction-synchronization I have to create a MessageListenerContainer to listen to my own events. Do I still have to send my events using the KafkaTemplate? Does the MessageListenerContainer prohibit the sending to the broker?

如果我理解正确,kafkaTemplate 和 kafkaTransactionManager 必须使用相同的 producerFactory,我必须在其中启用事务设置 transactionIdPrefix.在我的示例中,我必须将 messageListenerContainer 的 TransactionManager 设置为 DataSourceTransactionManager.对吗?

And if i understand correctly the kafkaTemplate und the kafkaTransactionManager have to use the same producerFactory in which i have to enable Transaction setting a transactionIdPrefix. And in my example i have to set the TransactionManager of the messageListenerContainer to the DataSourceTransactionManager. Is that correct?

从我的角度来看,我通过 kafkaTemplate 发送事件、监听我自己的事件并再次使用 kafkaTemplate 转发事件看起来很奇怪.

From my perspective it looks weird that I send an event via kafkaTemplate, listen to my own event and forward the event using the kafkaTemplate again.

如果我能得到一个 kafka 事务与存储库事务的简单同步的示例和解释,我真的会帮助我.

I would really help me if i can get an example for a simple synchronization of a kafka transaction with a repository transaction and an explanation.

推荐答案

如果侦听器容器配备了 KafkaTransactionManager,容器将创建一个生产者,该生产者将被任何下游 kafka 模板和容器将为您发送交易的偏移量.

If the listener container is provisioned with a KafkaTransactionManager, the container will create a producer which will be used by any downstream kafka template and the container will send the offsets to the transaction for you.

如果容器有其他事务管理器,则容器无法发送偏移量,因为它无权访问生产者(或模板).

If the container has some other transaction manager, the container can't send the offsets since it doesn't have access to the producer (or template).

另一种解决方案是使用 @Transactional(使用数据源 TM)注释您的方法,并使用 kafka TM 配置容器.

Another solution is to annotate your method with @Transactional (with the datasource TM) and configure the container with a kafka TM.

这样,您的 DB tx 将在线程返回容器之前提交,然后容器将偏移量发送到 kafka 事务并提交.

That way, your DB tx will commit just before the thread returns to the container which will then send the offsets to the kafka transaction and commit it.

框架测试用例示例.

这篇关于Spring Kafka 中的事务同步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆