同步数据库和Kafka生产者之间的事务 [英] Synchronising transactions between database and Kafka producer

查看:39
本文介绍了同步数据库和Kafka生产者之间的事务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个微服务架构,使用 Kafka 作为服务之间的通信机制.一些服务有自己的数据库.假设用户调用服务 A,这将导致在该服务的数据库中创建一条记录(或一组记录).此外,此事件应作为 Kafka 主题上的一个项目报告给其他服务.确保仅在 Kafka 主题成功更新后才写入数据库记录的最佳方法是什么(本质上是围绕数据库更新和 Kafka 更新创建分布式事务)?

We have a micro-services architecture, with Kafka used as the communication mechanism between the services. Some of the services have their own databases. Say the user makes a call to Service A, which should result in a record (or set of records) being created in that service’s database. Additionally, this event should be reported to other services, as an item on a Kafka topic. What is the best way of ensuring that the database record(s) are only written if the Kafka topic is successfully updated (essentially creating a distributed transaction around the database update and the Kafka update)?

我们正在考虑使用 spring-kafka(在 Spring Boot WebFlux 服务中),并且我可以看到它有一个 KafkaTransactionManager,但据我所知,这更多是关于 Kafka 事务本身(确保 Kafka 生产者和消费者之间的一致性),而不是跨两个系统同步事务(请参阅 此处:Kafka 不支持 XA,您必须处理 DB tx 在 Kafka tx 回滚时可能提交的可能性.").此外,我认为这个类依赖于 Spring 的事务框架,至少就我目前的理解,它是线程绑定的,如果使用响应式方法(例如 WebFlux),操作的不同部分可能会在不同的线程.(我们正在使用 reactive-pg-client,所以手动处理事务,而不是使用Spring 的框架.)

We are thinking of using spring-kafka (in a Spring Boot WebFlux service), and I can see that it has a KafkaTransactionManager, but from what I understand this is more about Kafka transactions themselves (ensuring consistency across the Kafka producers and consumers), rather than synchronising transactions across two systems (see here: "Kafka doesn't support XA and you have to deal with the possibility that the DB tx might commit while the Kafka tx rolls back."). Additionally, I think this class relies on Spring’s transaction framework which, at least as far as I currently understand, is thread-bound, and won’t work if using a reactive approach (e.g. WebFlux) where different parts of an operation may execute on different threads. (We are using reactive-pg-client, so are manually handling transactions, rather than using Spring’s framework.)

我能想到的一些选择:

  1. 不要将数据写入数据库:只将其写入 Kafka.然后使用消费者(在服务 A 中)更新数据库.这看起来可能不是最有效的,并且会出现问题,因为用户调用的服务无法立即看到它应该刚刚创建的数据库更改.
  2. 不要直接写入 Kafka:只写入数据库,并使用诸如 Debezium 之类的东西来报告更改到卡夫卡.这里的问题是更改基于单个数据库记录,而要存储在 Kafka 中的业务重大事件可能涉及来自多个表的数据的组合.
  3. 首先写入数据库(如果失败,什么都不做,只抛出异常).然后,在写入 Kafka 时,假设写入可能会失败.使用内置的自动重试功能让它继续尝试一段时间.如果最终完全失败,请尝试写入死信队列并创建某种手动机制供管理员对其进行整理.如果写入 DLQ 失败(即 Kafka 完全关闭),只需以其他方式记录(例如到数据库),然后再次创建某种手动机制供管理员对其进行整理.
  1. Don’t write the data to the database: only write it to Kafka. Then use a consumer (in Service A) to update the database. This seems like it might not be the most efficient, and will have problems in that the service which the user called cannot immediately see the database changes it should have just created.
  2. Don’t write directly to Kafka: write to the database only, and use something like Debezium to report the change to Kafka. The problem here is that the changes are based on individual database records, whereas the business significant event to store in Kafka might involve a combination of data from multiple tables.
  3. Write to the database first (if that fails, do nothing and just throw the exception). Then, when writing to Kafka, assume that the write might fail. Use the built-in auto-retry functionality to get it to keep trying for a while. If that eventually completely fails, try to write to a dead letter queue and create some sort of manual mechanism for admins to sort it out. And if writing to the DLQ fails (i.e. Kafka is completely down), just log it some other way (e.g. to the database), and again create some sort of manual mechanism for admins to sort it out.

有人对以上内容有任何想法或建议,或者能够纠正我上述假设中的任何错误吗?

Anyone got any thoughts or advice on the above, or able to correct any mistakes in my assumptions above?

提前致谢!

推荐答案

我建议使用方法 2 的稍微改变的变体.

I'd suggest to use a slightly altered variant of approach 2.

仅写入您的数据库,但除了实际的表写入外,还将事件"写入同一数据库内的特殊表中;这些事件记录将包含您需要的聚合.以最简单的方式,您只需插入另一个实体,例如由 JPA 映射,其中包含带有聚合有效负载的 JSON 属性.当然,这可以通过事务侦听器/框架组件的某种方式实现自动化.

Write into your database only, but in addition to the actual table writes, also write "events" into a special table within that same database; these event records would contain the aggregations you need. In the easiest way, you'd simply insert another entity e.g. mapped by JPA, which contains a JSON property with the aggregate payload. Of course this could be automated by some means of transaction listener / framework component.

然后使用 Debezium 从该表中捕获更改并将它们流式传输到 Kafka.这样你就有了两个:Kafka 中的最终一致状态(Kafka 中的事件可能会落后,或者你可能会在重启后第二次看到一些事件,但最终它们会反映数据库状态),而无需分布式事务,以及您所追求的业务级事件语义.

Then use Debezium to capture the changes just from that table and stream them into Kafka. That way you have both: eventually consistent state in Kafka (the events in Kafka may trail behind or you might see a few events a second time after a restart, but eventually they'll reflect the database state) without the need for distributed transactions, and the business level event semantics you're after.

(免责声明:我是 Debezium 的负责人;有趣的是,我正在写一篇博文来更详细地讨论这种方法)

(Disclaimer: I'm the lead of Debezium; funnily enough I'm just in the process of writing a blog post discussing this approach in more detail)

这里是帖子

https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/

https:///debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

这篇关于同步数据库和Kafka生产者之间的事务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆