同步数据库和Kafka生产者之间的事务 [英] Synchronising transactions between database and Kafka producer

查看:143
本文介绍了同步数据库和Kafka生产者之间的事务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个微服务架构,其中Kafka用作服务之间的通信机制.一些服务具有自己的数据库.假设用户调用了服务A,这将导致在该服务的数据库中创建一条记录(或一组记录).此外,此事件应作为Kafka主题的一个项目报告给其他服务.确保仅在成功更新Kafka主题(实质上是围绕数据库更新和Kafka更新创建分布式事务)时才写入数据库记录的最佳方法是什么?

We have a micro-services architecture, with Kafka used as the communication mechanism between the services. Some of the services have their own databases. Say the user makes a call to Service A, which should result in a record (or set of records) being created in that service’s database. Additionally, this event should be reported to other services, as an item on a Kafka topic. What is the best way of ensuring that the database record(s) are only written if the Kafka topic is successfully updated (essentially creating a distributed transaction around the database update and the Kafka update)?

我们正在考虑使用 spring-kafka (在Spring Boot WebFlux服务中),并且我可以看到它有一个 KafkaTransactionManager ,但据我了解,这更多是关于Kafka交易本身(确保Kafka生产者和消费者之间的一致性),而不是在两个系统之间同步交易(请参阅 reactive-pg-client ,因此是手动处理交易,而不是使用Spring的框架.)

We are thinking of using spring-kafka (in a Spring Boot WebFlux service), and I can see that it has a KafkaTransactionManager, but from what I understand this is more about Kafka transactions themselves (ensuring consistency across the Kafka producers and consumers), rather than synchronising transactions across two systems (see here: "Kafka doesn't support XA and you have to deal with the possibility that the DB tx might commit while the Kafka tx rolls back."). Additionally, I think this class relies on Spring’s transaction framework which, at least as far as I currently understand, is thread-bound, and won’t work if using a reactive approach (e.g. WebFlux) where different parts of an operation may execute on different threads. (We are using reactive-pg-client, so are manually handling transactions, rather than using Spring’s framework.)

我能想到的一些选择:

  1. 请勿将数据写入数据库:仅将其写入Kafka.然后使用使用者(在服务A中)更新数据库.看来这可能不是最有效的方法,并且会出现问题,因为用户调用的服务无法立即看到它应该刚刚创建的数据库更改.
  2. 不要直接写到Kafka:仅写到数据库,并使用 Debezium 之类的方式报告更改到卡夫卡.这里的问题是,更改是基于单个数据库记录的,而要存储在Kafka中的业务重大事件可能涉及多个表中数据的组合.
  3. 首先写入数据库(如果失败,则不执行任何操作,仅引发异常).然后,在写入Kafka时,假设写入可能会失败.使用内置的自动重试功能可以使其保持一段时间.如果最终完全失败,请尝试写入死信队列,并为管理员创建某种手动机制以对其进行处理.而且,如果写入DLQ失败(即Kafka完全关闭),只需以其他方式记录下来(例如,记录到数据库中),然后再次创建某种手动机制供管理员进行分类即可.
  1. Don’t write the data to the database: only write it to Kafka. Then use a consumer (in Service A) to update the database. This seems like it might not be the most efficient, and will have problems in that the service which the user called cannot immediately see the database changes it should have just created.
  2. Don’t write directly to Kafka: write to the database only, and use something like Debezium to report the change to Kafka. The problem here is that the changes are based on individual database records, whereas the business significant event to store in Kafka might involve a combination of data from multiple tables.
  3. Write to the database first (if that fails, do nothing and just throw the exception). Then, when writing to Kafka, assume that the write might fail. Use the built-in auto-retry functionality to get it to keep trying for a while. If that eventually completely fails, try to write to a dead letter queue and create some sort of manual mechanism for admins to sort it out. And if writing to the DLQ fails (i.e. Kafka is completely down), just log it some other way (e.g. to the database), and again create some sort of manual mechanism for admins to sort it out.

任何人都对以上内容有任何想法或建议,或者能够纠正上述假设中的任何错误?

Anyone got any thoughts or advice on the above, or able to correct any mistakes in my assumptions above?

提前谢谢!

推荐答案

我建议使用方法2稍有改动的变体.

I'd suggest to use a slightly altered variant of approach 2.

仅写到您的数据库中,但是除了实际的表写操作外,还将事件"写到同一数据库内的特殊表中;这些事件记录将包含您需要的汇总.以最简单的方式,您只需插入另一个实体,例如由JPA映射,该JPA包含带有聚合有效负载的JSON属性.当然,可以通过事务侦听器/框架组件的某种方式使之自动化.

Write into your database only, but in addition to the actual table writes, also write "events" into a special table within that same database; these event records would contain the aggregations you need. In the easiest way, you'd simply insert another entity e.g. mapped by JPA, which contains a JSON property with the aggregate payload. Of course this could be automated by some means of transaction listener / framework component.

然后使用Debezium仅从该表中捕获更改并将其流式传输到Kafka.这样一来,您就可以同时拥有两种条件:最终在Kafka中保持一致的状态(Kafka中的事件可能会落后或重新启动后第二次可能会看到一些事件,但最终它们会反映数据库状态),而无需进行分布式事务处理,以及您要遵循的业务级别事件语义.

Then use Debezium to capture the changes just from that table and stream them into Kafka. That way you have both: eventually consistent state in Kafka (the events in Kafka may trail behind or you might see a few events a second time after a restart, but eventually they'll reflect the database state) without the need for distributed transactions, and the business level event semantics you're after.

(免责声明:我是Debezium的负责人;很有趣的是,我只是在撰写一篇博客文章,详细讨论这种方法)

(Disclaimer: I'm the lead of Debezium; funnily enough I'm just in the process of writing a blog post discussing this approach in more detail)

这是帖子

https://debezium.io/blog/2018/09/20/materializing-aggregate-views-hibernate-and-debezium/

https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

这篇关于同步数据库和Kafka生产者之间的事务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆