使用 Kafka 流进行事件溯源 [英] Event sourcing with Kafka streams

查看:32
本文介绍了使用 Kafka 流进行事件溯源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试基于 Kafka 流(如 https://www.confluent.io 中所述/blog/event-sourcing-using-apache-kafka/)

我有 4 个基本部分:

  1. commands 主题,它使用聚合 ID 作为按聚合顺序处理命令的键
  2. events 主题,聚合状态的每个更改都会发布到该主题(同样,key 是聚合 ID).此主题的保留策略为永不删除"
  3. A KTable 减少聚合状态并保存它到州商店

    <前>事件主题流 ->按聚合 ID 分组到 Ktable ->将聚合事件减少到当前状态 ->实现为状态存储

  4. 命令处理器 - 命令流,左加入聚合状态 KTable.对于结果流中的每个条目,使用函数 (command, state) =>;events 生成结果事件并将它们发布到 events 主题

问题是 - 有没有办法确保我在 state store 中拥有最新版本的聚合?

如果违反业务规则,我想拒绝命令(例如 - 如果实体被标记为已删除,则修改实体的命令无效).但是,如果发布了 DeleteCommand 紧随其后的 ModifyCommand,删除命令将产生 DeletedEvent,但是当 ModifyCommand 被处理,从状态存储加载的状态可能尚未反映,冲突事件将被发布.

我不介意牺牲命令处理吞吐量,我宁愿得到一致性保证(因为所有内容都由相同的键分组并且应该在同一个分区中结束)

希望这很清楚:) 有什么建议吗?

解决方案

我不认为 Kafka 适合 CQRS 和事件源,你描述的方式,因为它缺乏一个 (简单)确保防止并发写入的方法.这篇文章详细讨论了这一点.

我所说的你描述的方式是这样一个事实,即你期望一个命令生成零个或多个事件或因异常而失败;这是具有事件源的经典 CQRS.大多数人都期待这种架构.

您可以采用不同的风格进行事件溯源.您的命令处理程序可以为接收到的每个命令产生事件(即 DeleteWasAccepted).然后,事件处理程序可以最终以事件源方式处理该事件(通过从其事件流重建聚合的状态)并发出其他事件(即 ItemDeletedItemDeletionWasRejected).因此,命令被触发后忘记,异步发送,客户端不会等待立即响应.然而,它等待描述其命令执行结果的事件.

一个重要的方面是事件处理程序必须以串行方式(恰好一次且按顺序)处理来自同一个聚合的事件.这可以使用单个 Kafka 消费者组来实现.您可以在此视频中了解此架构.

I'm trying to implement a simple CQRS/event sourcing proof of concept on top of Kafka streams (as described in https://www.confluent.io/blog/event-sourcing-using-apache-kafka/)

I have 4 basic parts:

  1. commands topic, which uses the aggregate ID as the key for sequential processing of commands per aggregate
  2. events topic, to which every change in aggregate state are published (again, key is the aggregate ID). This topic has a retention policy of "never delete"
  3. A KTable to reduce aggregate state and save it to a state store

    events topic stream ->
    group to a Ktable by aggregate ID ->
    reduce aggregate events to current state ->
    materialize as a state store
    

  4. commands processor - commands stream, left joined with aggregate state KTable. For each entry in the resulting stream, use a function (command, state) => events to produce resulting events and publish them to the events topic

The question is - is there a way to make sure I have the latest version of the aggregate in the state store?

I want to reject a command if violates business rules (for example - a command to modify the entity is not valid if the entity was marked as deleted). But if a DeleteCommand is published followed by a ModifyCommand right after it, the delete command will produce the DeletedEvent, but when the ModifyCommand is processed, the loaded state from the state store might not reflect that yet and conflicting events will be published.

I don't mind sacrificing command processing throughput, I'd rather get the consistency guarantees (since everything is grouped by the same key and should end up in the same partition)

Hope that was clear :) Any suggestions?

解决方案

I don't think Kafka is good for CQRS and Event sourcing yet, the way you described it, because it lacks a (simple) way of ensuring protection from concurrent writes. This article talks about this in details.

What I mean by the way you described it is the fact that you expect a command to generate zero or more events or to fail with an exception; this is the classical CQRS with Event sourcing. Most of the people expect this kind of Architecture.

You could have Event sourcing however in a different style. Your Command handlers could yield events for every command that is received (i.e. DeleteWasAccepted). Then, an Event handler could eventually handle that Event in an Event sourced way (by rebuilding Aggregate's state from its event stream) and emit other Events (i.e. ItemDeleted or ItemDeletionWasRejected). So, commands are fired-and-forget, sent async, the client does not wait for an immediate response. It waits however for an Event describing the outcome of its command execution.

An important aspect is that the Event handler must process events from the same Aggregate in a serial way (exactly once and in order). This can be implemented using a single Kafka Consumer Group. You can see about this architecture in this video.

这篇关于使用 Kafka 流进行事件溯源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆