如何通过每项服务的Spring Cloud Stream Kafka和数据库实现微服务事件驱动架构 [英] How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service

查看:113
本文介绍了如何通过每项服务的Spring Cloud Stream Kafka和数据库实现微服务事件驱动架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现事件驱动的体系结构来处理分布式事务.每个服务都有自己的数据库,并使用Kafka发送消息以将其他操作告知其他微服务.

I am trying to implement an event driven architecture to handle distributed transactions. Each service has its own database and uses Kafka to send messages to inform other microservices about the operations.

一个例子:

 Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database

订单收到订单请求.它必须将新订单存储在其数据库中并发布一条消息,以便付款服务部门意识到必须为该项目付费:

Order receives an order request. It has to store the new Order in its DB and publish a message so that Payment Service realizes it has to charge for the item:

私人OrderBusiness orderBusiness;

private OrderBusiness orderBusiness;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}

这些是我的疑问:

  1. 步骤a .-(保存在Order DB中)和b .-(发布消息)应在事务中自动执行.我该如何实现?
  2. 这与上一个有关:我通过以下方式发送消息:orderSource.output().send(MessageBuilder.withPayload(order).build());无论Kafka代理是否关闭,此操作都是异步的,并且始终返回true.我怎么知道邮件已经到达Kafka经纪人?

推荐答案

步骤a .-(保存在Order DB中)和b .-(发布消息)应为 在事务中执行,原子地执行.我该如何实现?

Steps a.- (save in Order DB) and b.- (publish the message) should be performed in a transaction, atomically. How can I achieve that?

Kafka当前不支持事务(因此也不支持回滚或提交),您需要将其同步.简而言之:您无法做自己想做的事.当 KIP-98 已合并,但这可能需要一些时间.此外,即使使用卡夫卡进行交易,在两个系统之间进行原子交易也是一件非常困难的事,随后的一切都只能通过卡夫卡的交易支持加以改进,但仍不能完全解决您的问题.为此,您需要研究在您的整个产品中实施某种形式的两阶段提交系统.

Kafka currently does not support transactions (and thus also no rollback or commit), which you'd need to synchronize something like this. So in short: you can't do what you want to do. This will change in the near-ish future, when KIP-98 is merged, but that might take some time yet. Also, even with transactions in Kafka, an atomic transaction across two systems is a very hard thing to do, everything that follows will only be improved upon by transactional support in Kafka, it will still not entirely solve your issue. For that you would need to look into implementing some form of two phase commit across your systems.

您可以通过配置生产者属性来达到某种程度的接近,但是最后,您将必须为您的其中一个系统在至少一次最多一次之间进行选择( MariaDB或Kafka).

You can get somewhat close by configuring producer properties, but in the end you will have to chose between at least once or at most once for one of your systems (MariaDB or Kafka).

让我们从您在Kafka中可以做的事情开始,确保邮件的传递,然后进一步,我们将深入探讨您对整个流程的选择以及后果.

Let's start with what you can do in Kafka do ensure delivery of a message and further down we'll dive into your options for the overall process flow and what the consequences are.

保证交付

您可以配置多少经纪人来确认收到消息,然后使用参数 acks 将请求返回给您:将其设置为您告诉的 all 代理等待,直到所有副本都确认了您的消息,然后再将答案返回给您.这仍然不能100%保证您的消息不会丢失,因为它仅被写入页面高速缓存,并且在某些理论上,代理在持久存储到磁盘之前会发生故障,从而可能仍会丢失消息.但这是您要获得的良好保证. 您可以通过降低代理强制fsync到光盘(强调的文本和/或 flush.ms )的时间间隔来进一步降低数据丢失的风险,但是请注意,这些价值观会给他们带来沉重的性能损失.

You can configure how many brokers have to confirm receipt of your messages, before the request is returned to you with the parameter acks: by setting this to all you tell the broker to wait until all replicas have acknowledged your message before returning an answer to you. This is still no 100% guarantee that your message will not be lost, since it has only been written to the page cache yet and there are theoretical scenarios with a broker failing before it is persisted to disc, where the message might still be lost. But this is as good a guarantee as you are going to get. You can further reduce the risk of data loss by lowering the intervall at which brokers force an fsync to disc (emphasized text and/or flush.ms) but please be aware, that these values can bring with them heavy performance penalties.

除了这些设置之外,您还需要等待Kafka生产者将对您的请求的响应返回给您,并检查是否发生了异常.这种关系与您的问题的第二部分有关,因此我将深入探讨这一问题. 如果响应是干净的,则可以确保您的数据已到达Kafka,并开始担心MariaDB.

In addition to these settings you will need to wait for your Kafka producer to return the response for your request to you and check whether an exception occurred. This sort of ties into the second part of your question, so I will go into that further down. If the response is clean, you can be as sure as possible that your data got to Kafka and start worrying about MariaDB.

到目前为止,我们所讨论的所有内容仅涉及如何确保Kafka收到您的消息,但是您还需要将数据写入MariaDB,这也会失败,这将有必要撤回您可能已经发送的消息到卡夫卡-这是你做不到的.

Everything we have covered so far only addresses how to ensure that Kafka got your messages, but you also need to write data into MariaDB, and this can fail as well, which would make it necessary to recall a message you potentially already sent to Kafka - and this you can't do.

因此,基本上,您需要选择一个系统,在该系统中,您可以更好地处理重复值/缺失值(取决于您是否重发部分失败),并且这将影响您执行工作的顺序.

So basically you need to choose one system in which you are better able to deal with duplicates/missing values (depending on whether or not you resend partial failures) and that will influence the order you do things in.

选项1

在此选项中,您将在MariaDB中初始化事务,然后将消息发送到Kafka,等待响应,如果发送成功,则在MariaDB中提交事务.如果发送给Kafka失败,则可以在MariaDB中回滚事务,一切都很好. 但是,如果成功发送到Kafka并且由于某种原因您对MariaDB的提交失败,则无法从Kafka取回消息.因此,如果稍后重新发送所有内容,那么您要么会丢失MariaDB中的消息,要么会丢失Kafka中的重复消息.

In this option you initialize a transaction in MariaDB, then send the message to Kafka, wait for a response and if the send was successful you commit the transaction in MariaDB. Should sending to Kafka fail, you can rollback your transaction in MariaDB and everything is dandy. If however, sending to Kafka is successful and your commit to MariaDB fails for some reason, then there is no way of getting back the message from Kafka. So you will either be missing a message in MariaDB or have a duplicate message in Kafka, if you resend everything later on.

选项2

这几乎是相反的方式,但是根据您的数据模型,您也许可以更好地删除用MariaDB编写的消息.

This is pretty much just the other way around, but you are probably better able to delete a message that was written in MariaDB, depending on your data model.

当然,您可以通过跟踪失败的发送并稍后重试这些方法来减轻这两种方法的影响,但是在更大的问题上,所有这一切都是临时的.

Of course you can mitigate both approaches by keeping track of failed sends and retrying just these later on, but all of that is more of a bandaid on the bigger issue.

我个人会采用方法1,因为提交失败的可能性应该比发送本身小一些,并在Kafka的另一端实施某种重复检查.

Personally I'd go with approach 1, since the chance of a commit failing should be somewhat smaller than the send itself and implement some sort of dupe check on the other side of Kafka.

这与上一个有关:我通过以下方式发送消息: orderSource.output().send(MessageBuilder.withPayload(order).build()); 此操作是异步的,并且无论是否 卡夫卡经纪人已倒闭.我怎么知道消息已经到达 卡夫卡经纪人?

This is related to the previous one: I send the message with: orderSource.output().send(MessageBuilder.withPayload(order).build()); This operations is asynchronous and ALWAYS returns true, no matter if the Kafka broker is down. How can I know that the message has reached the Kafka broker?

现在,首先,我承认我不熟悉Spring,因此这可能对您没有用,但是以下代码段说明了一种检查产生异常响应的方法. 通过调用flush,您可以阻止直到所有发送完成(失败或成功),然后检查结果.

Now first of, I'll admit I am unfamiliar with Spring, so this may not be of use to you, but the following code snippet illustrates one way of checking produce responses for exceptions. By calling flush you block until all sends have finished (and either failed or succeeded) and then check the results.

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}

这篇关于如何通过每项服务的Spring Cloud Stream Kafka和数据库实现微服务事件驱动架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆