大众运输:在存在不同消息类型时确保消息处理顺序 [英] Mass Transit: ensure message processing order when there are different message types

查看:121
本文介绍了大众运输:在存在不同消息类型时确保消息处理顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是地铁的新手,我想了解一下它是否对我的情况有帮助. 我正在构建使用CQRS事件源体系结构实现的示例应用程序,并且需要服务总线以便将由命令堆栈创建的事件调度到查询堆栈非规范化器.

I'm new to Mass Transit and I would like to understand if it can helps with my scenario. I'm building a sample application implemented with a CQRS event sourcing architecture and I need a service bus in order to dispatch the events created by the command stack to the query stack denormalizers.

我们假设在我们的域中有一个聚合,我们将其称为照片,以及两个不同的域事件: PhotoUploaded PhotoArchived .

Let's suppose of having a single aggregate in our domain, let's call it Photo, and two different domain events: PhotoUploaded and PhotoArchived.

在这种情况下,我们有两种不同的消息类型,默认的大众运输行为是创建两种不同的RabbitMq交换:一种用于PhotoUploaded消息类型,另一种用于PhotoArchived消息类型.

Given this scenario, we have two different message types and the default Mass Transit behaviour is creating two different RabbitMq exchanges: one for the PhotoUploaded message type and the other for the PhotoArchived message type.

让我们假设有一个称为 PhotoDenormalizer 的反规范化器:该服务将同时使用这两种消息类型,因为无论何时上传或归档照片,都必须更新照片读取模型.

Let's suppose of having a single denormalizer called PhotoDenormalizer: this service will be a consumer of both message types, because the photo read model must be updated whenever a photo is uploaded or archived.

在默认的大众运输拓扑下,将有两种不同的交换方式,因此无法保证不同类型事件之间的消息处理顺序:我们唯一的保证是,所有相同类型的事件都将按顺序处理,但我们不能保证不同类型事件之间的处理顺序(请注意,考虑到我的示例的事件语义,处理顺序很重要).

Given the default Mass Transit topology, there will be two different exchanges so the message processing order cannot be guaranteed between events of different types: the only guarantee that we have is that all the events of the same type will be processed in order, but we cannot guarantee the processing order between events of different type (notice that, given the events semantic of my example, the processing order matters).

如何处理这种情况?地铁适合我的需求吗?我是否完全不了解域事件调度的要点?

How can I handle such a scenario ? Is Mass Transit suitable with my needs ? Am I completely missing the point with domain events dispatching ?

推荐答案

免责声明:这不是您问题的答案,而是预防性消息,为什么您不应该 做您打算做的事.

Disclaimer: this is not an answer to your question, but rather a preventive message why you should not do what you are planning to do.

尽管RMQ之类的消息代理和MassTransit之类的消息中间件库非常适合集成,但我强烈建议不要使用消息代理进行事件源.我可以参考我的旧答案 Event-sourcing:when (而不是)我应该使用Message Queue吗?解释其背后的原因.

Whilst message brokers like RMQ and messaging middleware libraries like MassTransit are perfect for integration, I strongly advise against using message brokers for event-sourcing. I can refer to my old answer Event-sourcing: when (and not) should I use Message Queue? that explains the reasons behind it.

您发现自己的原因之一-事件顺序将永远无法得到保证.

One of the reasons you have found yourself - event order will never be guaranteed.

另一个明显的原因是,根据通过消息代理发布的事件来构建读取模型,这有效地消除了重播的可能性,并消除了需要从头开始处理事件的新读取模型,但是它们得到的全部是正在现在发布的事件.

Another obvious reason is that building read models from events that are published via a message broker effectively removes the possibility for replay and to build new read models that would need to start processing events from the beginning of time, but all they get are events that are being published now.

聚合形成事务边界,因此每个命令都需要确保它在一个事务中完成.虽然MT支持交易中间件,但它仅保证您获得依赖项的交易支持它们,但不支持使用者体内的context.Publish(@event),因为RMQ不支持交易.您很有可能提交更改,而不会在读取端获取事件.因此,事件存储的经验法则是,您应该能够从存储中预订更改流 ,并且不发布代码中的事件,除非这些事件是集成事件而不是域事件.

Aggregates form transactional boundaries, so every command needs to guarantee that it completes within one transaction. Whilst MT supports the transaction middleware, it only guarantees that you get a transaction for dependencies that support them, but not for context.Publish(@event) in the consumer body, since RMQ doesn't support transactions. You get a good chance of committing changes and not getting events on the read side. So, the rule of thumb for event stores that you should be able to subscribe to the stream of changes from the store, and not publish events from your code, unless those are integration events and not domain events.

对于事件源,至关重要的是,每个读取模型在其计划的事件流中保留其自己的检查点.消息代理没有提供这种功能,因为检查点"实际上就是您的队列,并且一旦消息从队列中消失了-它永远消失了,就再也没有回来.

For event-sourcing, it is crucial that each read-model keeps its own checkpoint in the stream of events it is projecting. Message brokers don't give you that kind of power since the "checkpoint" is actually your queue and as soon as the message is gone from the queue - it is gone forever, there's no coming back.

关于实际问题:

您可以使用消息拓扑配置来设置同一实体为不同的消息命名,然后将它们发布到同一交易所,但这属于Chris在该页面上所写的滥用"类别.我没有尝试过,但是您绝对可以尝试.消息CLR类型是元数据的一部分,因此不应该存在反序列化问题.

You can use the message topology configuration to set the same entity name for different messages and then they'll be published to the same exchange, but that falls to the "abuse" category like Chris wrote on that page. I haven't tried that but you definitely can experiment. Message CLR type is part of the metadata, so there shouldn't be deserialization issues.

但是同样地,将消息放在同一交换机中不会给您任何订购保证,除非事实是所有消息都将落在使用服务的一个队列中.

But again, putting messages in the same exchange won't give you any ordering guarantees, except the fact that all messages will land in one queue for the consuming service.

您将至少必须根据您的聚合ID设置分区过滤器,以防止并行处理同一聚合的多个消息.顺便说一句,这对于集成也很有用.我们就是这样做的:

You will have to at least set the partitioning filter based on your aggregate id, to prevent multiple messages for the same aggregate from being processed in parallel. That, by the way, is also useful for integration. That's how we do it:

void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
    => ep.Handler<T>(
        c => appService.Handle(c, aggregateStore), 
        hc => hc.UsePartitioner(8, partition));

AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);

这篇关于大众运输:在存在不同消息类型时确保消息处理顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆