EventSourced Saga实现 [英] EventSourced Saga Implementation

查看:120
本文介绍了EventSourced Saga实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经写了一个事件源聚合,现在实现了一个事件源Saga ...我注意到两者都是相似的,并创建了一个事件源对象作为派生两者的基类.

I have written an Event Sourced Aggregate and now implemented an Event Sourced Saga... I have noticed the two are similair and created an event sourced object as a base class from which both derive.

我在这里看到了一个演示 http ://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/,但可能会出现问题,因为如果发生进程崩溃,命令可能会丢失因为命令的发送不在写事务之外?

I have seen one demo here http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/ but feel there may be an issue as Commands could be lost in the event of a process crash as the sending of commands is outside the write transaction?

public void Save(ISaga saga)
{
    var events = saga.GetUncommittedEvents();
    eventStore.Write(new UncommittedEventStream
    {
        Id = saga.Id,
        Type = saga.GetType(),
        Events = events,
        ExpectedVersion = saga.Version - events.Count
    });

    foreach (var message in saga.GetUndispatchedMessages())
        bus.Send(message); // can be done in different ways

    saga.ClearUncommittedEvents();
    saga.ClearUndispatchedMessages();
}

相反,我使用的是格雷格·杨(Greg Young)的EventStore,当我保存EventSourcedObject(聚合或传奇)时,序列如下:

Instead I am using Greg Young's EventStore and when I save an EventSourcedObject (either an aggregate or a saga) the sequence is as follows:

  1. 存储库获取新的MutatingEvents列表.
  2. 将它们写入流.
  3. 将流写入并提交到流时,EventStore会触发新事件.
  4. 我们从EventStore监听事件,并在EventHandlers中处理它们.

我正在实现传奇的两个方面:

I am implementing the two aspects of a saga:

  1. 接收事件,该事件可能是过渡状态,而这又可能发出命令.
  2. 要有一个警报,以便将来在某个时候(通过外部计时器服务)在我们回叫).
  1. To take in events, which may transition state, which in turn may emit commands.
  2. To have an alarm where at some point in the future (via an external timer service) we can be called back).

问题

  1. 据我了解,事件处理程序不应发出命令(如果命令失败会发生什么?)-但我可以接受上述内容,因为Saga是控制创建内容的实际方法命令(通过对事件的响应)通过此 event 代理,命令发送的任何失败都可以在外部处理(在处理CommandEmittedFromSaga并在命令失败后重新发送的外部EventHandler中)?

  1. As I understand event handlers should not emit commands (what happens if the command fails?) - but am I OK with the above since the Saga is the actual thing controlling the creation of commands (in reaction to events) via this event proxy, and any failure of Command sending can be handled externally (in the external EventHandler that deals with CommandEmittedFromSaga and resends if the command fails)?

还是我忘记包装事件并将本机CommandsEvents存储在同一流中(与基类Message混合-Saga会同时使用Command和Events,一个Aggregate只会使用Event) ?

Or do I forget wrapping events and store native Commands and Events in the same stream (intermixed with a base class Message - the Saga would consume both Commands and Events, an Aggregate would only consume Events)?

网络上是否还有其他有关实施事件来源Sagas的参考资料?我能理智地检查我的想法吗?

Any other reference material on the net for implementation of event sourced Sagas? Anything I can sanity check my ideas against?

下面是一些背景代码.

Saga发出要运行的命令(包装在CommandEmittedFromSaga事件中)

下面的命令包含在事件中:

Command below is wrapped inside event:

public class CommandEmittedFromSaga : Event
{
    public readonly Command Command;
    public readonly Identity SagaIdentity;
    public readonly Type SagaType;

    public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
    {
        Command = command;
        SagaType = sagaType;
        SagaIdentity = sagaIdentity;
    }
}

Saga在将来的某个时间点请求回调(AlarmRequestedBySaga事件)

警报回调请求包装在一个事件的旁边,并且将在请求的时间或之后向该事件发送回事件和事件:

Alarm callback request is wrapped onside an event, and will fire back and event to the Saga on or after the requested time:

public class AlarmRequestedBySaga : Event
{
    public readonly Event Event;
    public readonly DateTime FireOn;
    public readonly Identity Identity;
    public readonly Type SagaType;

    public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
    {
        Identity = identity;
        SagaType = sagaType;
        Event = @event;
        FireOn = fireOn;
    }
}

或者,我可以将Command和Events都存储在基本类型Message的同一流中

public abstract class EventSourcedSaga
{
    protected EventSourcedSaga() { }

    protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
    {
        Identity = id;

        if (messages == null) throw new ArgumentNullException(nameof(messages));

        var count = 0;

        foreach (var message in messages)
        {
            var ev = message as Event;
            var command = message as Command;

            if(ev != null) Transition(ev);
            else if(command != null) _messages.Add(command);
            else throw new Exception($"Unsupported message type {message.GetType()}");

            count++;
        }

        if (count == 0)
            throw new ArgumentException("No messages provided");

        // All we need to know is the original number of events this
        // entity has had applied at time of construction.
        _unmutatedVersion = count;
        _constructing = false;
    }

    readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
    readonly List<Message> _messages = new List<Message>();
    readonly int _unmutatedVersion;
    private readonly bool _constructing = true;
    public readonly Identity Identity;

    public IList<Message> GetMessages()
    {
        return _messages.ToArray();
    }

    public void Transition(Event e)
    {
        _messages.Add(e);
        _dispatcher.Dispatch(this, e);
    }

    protected void SendCommand(Command c)
    {
        // Don't add a command whilst we are in the constructor. Message
        // state transition during construction must not generate new
        // commands, as those command will already be in the message list.
        if (_constructing) return;

        _messages.Add(c);
    }

    public int UnmutatedVersion() => _unmutatedVersion;
}

推荐答案

我相信前两个问题是对流程管理器的错误理解(又名Sagas,请参见底部的术语说明).

I believe the first two questions are the result of a wrong understanding of Process Managers (aka Sagas, see note on terminology at bottom).

似乎您正在尝试将其建模为逆聚合(就像我曾经做的那样).这样做的问题是:集合的社会契约"是其输入(命令)可以随时间变化(因为系统必须能够随时间变化),而其输出(事件)却不能.写入事件后,事件就成为历史,系统必须始终能够处理这些事件.在这种情况下,可以从不可变的事件流中可靠地加载聚合.

It seems like you are trying to model it (as I once did) as an inverse aggregate. The problem with that: the "social contract" of an aggregate is that its inputs (commands) can change over time (because systems must be able to change over time), but its outputs (events) cannot. Once written, events are a matter of history and the system must always be able to handle them. With that condition in place, an aggregate can be reliably loaded from an immutable event stream.

如果您尝试仅将输入和输出作为流程管理器实现反转,则记录的输出就不会成为问题,因为随着时间的推移,命令可能会被弃用并从系统中删除.当您尝试使用已删除的命令加载流时,它将崩溃.因此,无法可靠地从不变消息流中重新加载建模为反向聚合的流程管理器. (好吧,我确定您可以设计一种方法……但这是明智的吗?)

If you try to just reverse the inputs and outputs as a process manager implementation, it's output cannot be a matter of record because commands can be deprecated and removed from the system over time. When you try to load a stream with a removed command, it will crash. Therefore a process manager modeled as an inverse aggregate could not be reliably reloaded from an immutable message stream. (Well I'm sure you could devise a way... but is it wise?)

因此,让我们考虑一下它所取代的内容,以考虑实施流程管理器.以一个管理订单履行等流程的员工为例.为此用户要做的第一件事是在用户界面中设置一个视图,供他们查看.您要做的第二件事是在UI中创建按钮,以使用户执行操作以响应他们在视图上看到的内容.前任. 此行具有PaymentFailed,所以我单击CancelOrder.此行具有PaymentSucceededOrderItemOutOfStock,所以我单击ChangeToBackOrder.此订单是Pending,已有1天,所以单击FlagOrderForReview "...等等.一旦确定了正确的决策流程并开始占用用户太多时间,您就可以自动执行此流程.为了使它自动化,其他所有东西都可以保持不变(视图,甚至包括某些UI,以便您可以对其进行检查),但是用户已更改为一段代码.

So let's think about implementing a Process Manager by looking at what it replaces. Take for example an employee who manages a process like order fulfillment. The first thing you do for this user is setup a view in the UI for them to look at. The second thing you do is to make buttons in the UI for the user to perform actions in response to what they see on the view. Ex. "This row has PaymentFailed, so I click CancelOrder. This row has PaymentSucceeded and OrderItemOutOfStock, so I click ChangeToBackOrder. This order is Pending and 1 day old, so I click FlagOrderForReview"... and so forth. Once the decision process is well-defined and starts requiring too much of the user's time, you are tasked to automate this process. To automate it, everything else can stay the same (the view, even some of the UI so you can check on it), but the user has changed to be a piece of code.

走开,否则我将用一个非常小的Shell脚本代替您."

"Go away or I will replace you with a very small shell script."

现在,流程管理器代码会定期读取视图,并且如果存在某些数据条件,则可能会发出命令.本质上,Process Manager的最简单版本是一些可在计时器上运行的代码(例如,每小时一次),并且取决于特定的视图.那是我开始的地方...您已经拥有的东西(视图/视图更新器)和最少的添加(定期运行的代码).即使您稍后决定针对某些用例需要不同的功能,"Future You"也会更好地了解需要解决的特定缺陷.

The process manager code now periodically reads the view and may issue commands if certain data conditions are present. Essentially, the simplest version of a Process Manager is some code that runs on a timer (e.g. every hour) and depends on particular view(s). That's the place where I would start... with stuff you already have (views/view updaters) and minimal additions (code that runs periodically). Even if you decide later that you need different capability for certain use cases, "Future You" will have a better idea of the specific shortcomings that need addressing.

这是一个使您想起加尔定律的好地方也许还有YAGNI.

And this is a great place to remind you of Gall's law and probably also YAGNI.

  1. 网络上是否有其他有关实施事件源Sagas的参考资料?我可以理智地检查我的想法吗?

很难找到好的材料,因为这些概念具有非常可延展的实现,并且有各种各样的示例,其中许多示例出于通用目的而进行了过度设计.但是,这是我在答案中使用的一些参考.

Good material is hard to find as these concepts have very malleable implementations, and there are diverse examples, many of which are over-engineered for general purposes. However, here are some references that I have used in the answer.

DDD-不断发展的业务流程
DDD/CQRS Google网上论坛(大量阅读材料)

DDD - Evolving Business Processes
DDD/CQRS Google Group (lots of reading material)

注意,Saga一词的含义与Process Manager不同.常见的传奇实现基本上是每个步骤都包含一个路由清单,并且其相应的故障补偿都包含在清单中.这取决于路由选择清单的每个接收者执行在路由选择清单上指定的内容,并将其成功传递到下一跳,或者执行故障补偿并向后路由.当处理由不同组管理的多个系统时,这可能有点过于乐观,因此通常使用流程管理器代替. 看到这个所以问题以获取更多信息.

Note that the term Saga has a different implication than a Process Manager. A common saga implementation is basically a routing slip with each step and its corresponding failure compensation included on the slip. This depends on each receiver of the routing slip performing what is specified on the routing slip and successfully passing it on to the next hop or performing the failure compensation and routing backward. This may be a bit too optimistic when dealing with multiple systems managed by different groups, so process managers are often used instead. See this SO question for more information.

这篇关于EventSourced Saga实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆