如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构 [英] How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service

查看:35
本文介绍了如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现一个事件驱动架构来处理分布式事务.每个服务都有自己的数据库,并使用 Kafka 发送消息通知其他微服务有关操作.

示例:

 订购服务 ------->|卡夫卡|------->支付服务||订单 MariaDB DB 付款 MariaDB 数据库

订单收到订单请求.它必须将新订单存储在其数据库中并发布一条消息,以便支付服务意识到它必须为该项目收费:

私人订单业务订单业务;

@PostMapping公共订单 createOrder(@RequestBody 订单订单){logger.debug("createOrder()");//a.- 将订单保存在数据库中orderBusiness.createOrder(order);//b.在主题中发布,以便支付服务对该项目收费.尝试{orderSource.output().send(MessageBuilder.withPayload(order).build());}catch(异常e){logger.error("{}", e);}退货单;}

这些是我的疑问:

  1. 步骤 a.-(保存在 Order DB 中)和 b.-(发布消息)应该在事务中以原子方式执行.我怎样才能做到这一点?
  2. 这和上一个有关:我用以下命令发送消息:orderSource.output().send(MessageBuilder.withPayload(order).build());此操作是异步的,并且始终返回 true,无论 Kafka 代理是否关闭.我如何知道消息已到达 Kafka 代理?

解决方案

步骤 a.-(保存在订单数据库中)和 b.-(发布消息)应该是在事务中以原子方式执行.我怎样才能做到这一点?

Kafka 目前不支持事务(因此也不支持回滚或提交),您需要同步这样的事情.简而言之:你不能做你想做的事.这将在不久的将来改变,当

在这个选项中,你在 MariaDB 中初始化一个事务,然后将消息发送到 Kafka,等待响应,如果发送成功,你在 MariaDB 中提交事务.如果发送到 Kafka 失败,您可以在 MariaDB 中回滚您的事务,一切都很好.但是,如果发送到 Kafka 成功并且您对 MariaDB 的提交由于某种原因失败,则无法从 Kafka 取回消息.因此,如果您稍后重新发送所有内容,那么您要么在 MariaDB 中丢失一条消息,要么在 Kafka 中收到一条重复消息.

选项 2

这几乎正好相反,但您可能更好地能够删除在 MariaDB 中编写的消息,具体取决于您的数据模型.

当然,您可以通过跟踪失败的发送并稍后重试这些方法来缓解这两种方法,但所有这些更像是针对更大问题的创可贴.

就我个人而言,我会采用方法 1,因为提交失败的可能性应该比发送本身小一些,并在 Kafka 的另一端实施某种欺骗检查.

<小时><块引用>

这与上一个有关:我发送的消息是:orderSource.output().send(MessageBuilder.withPayload(order).build());此操作是异步的,并且始终返回 true,无论是否卡夫卡经纪人宕机了.我怎么知道消息已经到达卡夫卡经纪人?

首先,我承认我不熟悉 Spring,所以这可能对您没有用,但以下代码片段说明了检查生产响应是否有异常的一种方法.通过调用flush,您会阻塞直到所有发送完成(并且要么失败要么成功),然后检查结果.

Producer生产者 = 新的 KafkaProducer<>(myConfig);最终 ArrayList<异常>异常列表 = 新的 ArrayList<>();for(消息类型消息:消息){producer.send(new ProducerRecord("myTopic", message.getKey(), message.getValue()), new Callback() {@覆盖public void onCompletion(RecordMetadata 元数据,异常异常){如果(异常!= null){exceptionList.add(exception);}}});}生产者.flush();if (!exceptionList.isEmpty()) {//做东西}

I am trying to implement an event driven architecture to handle distributed transactions. Each service has its own database and uses Kafka to send messages to inform other microservices about the operations.

An example:

 Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database

Order receives an order request. It has to store the new Order in its DB and publish a message so that Payment Service realizes it has to charge for the item:

private OrderBusiness orderBusiness;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}

These are my doubts:

  1. Steps a.- (save in Order DB) and b.- (publish the message) should be performed in a transaction, atomically. How can I achieve that?
  2. This is related to the previous one: I send the message with: orderSource.output().send(MessageBuilder.withPayload(order).build()); This operations is asynchronous and ALWAYS returns true, no matter if the Kafka broker is down. How can I know that the message has reached the Kafka broker?

解决方案

Steps a.- (save in Order DB) and b.- (publish the message) should be performed in a transaction, atomically. How can I achieve that?

Kafka currently does not support transactions (and thus also no rollback or commit), which you'd need to synchronize something like this. So in short: you can't do what you want to do. This will change in the near-ish future, when KIP-98 is merged, but that might take some time yet. Also, even with transactions in Kafka, an atomic transaction across two systems is a very hard thing to do, everything that follows will only be improved upon by transactional support in Kafka, it will still not entirely solve your issue. For that you would need to look into implementing some form of two phase commit across your systems.

You can get somewhat close by configuring producer properties, but in the end you will have to chose between at least once or at most once for one of your systems (MariaDB or Kafka).

Let's start with what you can do in Kafka do ensure delivery of a message and further down we'll dive into your options for the overall process flow and what the consequences are.

Guaranteed delivery

You can configure how many brokers have to confirm receipt of your messages, before the request is returned to you with the parameter acks: by setting this to all you tell the broker to wait until all replicas have acknowledged your message before returning an answer to you. This is still no 100% guarantee that your message will not be lost, since it has only been written to the page cache yet and there are theoretical scenarios with a broker failing before it is persisted to disc, where the message might still be lost. But this is as good a guarantee as you are going to get. You can further reduce the risk of data loss by lowering the intervall at which brokers force an fsync to disc (emphasized text and/or flush.ms) but please be aware, that these values can bring with them heavy performance penalties.

In addition to these settings you will need to wait for your Kafka producer to return the response for your request to you and check whether an exception occurred. This sort of ties into the second part of your question, so I will go into that further down. If the response is clean, you can be as sure as possible that your data got to Kafka and start worrying about MariaDB.

Everything we have covered so far only addresses how to ensure that Kafka got your messages, but you also need to write data into MariaDB, and this can fail as well, which would make it necessary to recall a message you potentially already sent to Kafka - and this you can't do.

So basically you need to choose one system in which you are better able to deal with duplicates/missing values (depending on whether or not you resend partial failures) and that will influence the order you do things in.

Option 1

In this option you initialize a transaction in MariaDB, then send the message to Kafka, wait for a response and if the send was successful you commit the transaction in MariaDB. Should sending to Kafka fail, you can rollback your transaction in MariaDB and everything is dandy. If however, sending to Kafka is successful and your commit to MariaDB fails for some reason, then there is no way of getting back the message from Kafka. So you will either be missing a message in MariaDB or have a duplicate message in Kafka, if you resend everything later on.

Option 2

This is pretty much just the other way around, but you are probably better able to delete a message that was written in MariaDB, depending on your data model.

Of course you can mitigate both approaches by keeping track of failed sends and retrying just these later on, but all of that is more of a bandaid on the bigger issue.

Personally I'd go with approach 1, since the chance of a commit failing should be somewhat smaller than the send itself and implement some sort of dupe check on the other side of Kafka.


This is related to the previous one: I send the message with: orderSource.output().send(MessageBuilder.withPayload(order).build()); This operations is asynchronous and ALWAYS returns true, no matter if the Kafka broker is down. How can I know that the message has reached the Kafka broker?

Now first of, I'll admit I am unfamiliar with Spring, so this may not be of use to you, but the following code snippet illustrates one way of checking produce responses for exceptions. By calling flush you block until all sends have finished (and either failed or succeeded) and then check the results.

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}

这篇关于如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆