ASP.NET Web Api的事件发布者 [英] Event publisher for ASP.NET Web Api

查看:75
本文介绍了ASP.NET Web Api的事件发布者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经开始使用微服务,我需要创建一个事件发布机制.

我计划使用Amazon SQS.

这个想法很简单.我将事件与聚合存储在数据库中. 如果用户要更改电子邮件,则事件UserChangedEmail将存储在数据库中.

我还有事件处理程序,例如UserChangedEmailHandler,它将(在这种情况下)负责将此事件发布到SQS队列,以便其他服务可以知道用户已更改电子邮件.

我的问题是,要实现这一目标的做法是什么?我应该有某种后台定时过程来扫描事件表并将事件发布到SQS吗? 这可以是WebApi应用程序内的进程(首选),还是应该是单独的进程?

一种想法是使用Hangfire,但它在一分钟之内不支持cron作业.

有什么建议吗?

按照答案之一的建议,我已经查看了NServicebus. NServiceBus页面上的示例之一显示了我的关注重点.

在他们的示例中,他们创建一个已下订单的日志.如果成功提交了日志或数据库条目,但是发布中断并且事件从未发布,该怎么办?

这是事件处理程序的代码:

public class PlaceOrderHandler :
    IHandleMessages<PlaceOrder>
{
    static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
    IBus bus;

    public PlaceOrderHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(PlaceOrder message)
    {
        log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
        log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

        var orderPlaced = new OrderPlaced
        {
            OrderId = message.Id
        };
        bus.Publish(orderPlaced); <!-- my concern
    }
}

解决方案

现成的建议

我建议您不要研究自己的现成产品,因为这里有很多复杂性,这些复杂性一开始就不会显现出来,例如

  • 管理事件订阅者列表-SQS队列与事件使用者(而不是事件生产者)更合适地配对,因为当消息被消耗时,它不再在队列中可用-因此,如果要支持多个订阅者给定事件(这是事件驱动的体系结构的巨大好处),您如何知道在首次引发事件消息时将事件消息推送到哪个SQS队列?
  • 重试语义,错误转发队列-处理临时性基础结构问题导致的临时错误与业务逻辑语义问题导致的永久性错误
  • 哪些消息何时发出以及在何处发送的审计线索
  • 通过SQS发送的消息的安全性(您的业务案例需要对它们进行加密吗?SQS是Amazon提供的一项应用程序服务,不提供存储级加密
  • 邮件大小-SQS有邮件大小限制,因此您最终可能需要处理大型邮件的带外传输

那只是我的头顶...

一些现成的系统可以提供帮助:

  • NServiceBus 提供了用于管理命令和事件消息传递的框架,并且具有允许灵活传输类型的插件框架- NServiceBus.SQS 提供SQS作为传输.
    • 提供全面而灵活的重试,审核和错误处理
    • 公然使用命令vs事件(命令消息说执行此操作",并发送到单个服务进行处理,事件消息说发生了什么事",并发送到任意数量的灵活订户)
    • 发件箱模式即使使用非事务一致的传输(例如SQS),也可以提供事务一致的消息
    • 当前,SQS插件使用默认的NServiceBus订阅者持久性,这需要SQL Server来存储事件订阅者列表(有关利用SNS的选项,请参见下文)
    • 为sagas提供支持,提供了一个框架,以通过补偿动作来确保多事务最终与回滚的一致性
    • 超时支持计划的邮件处理
    • 商业产品,不是免费的,但是许多插件/扩展都是开源的
  • 大众运输
    • 不支持现成的SQS,但确实支持Azure Service Bus和RabbitMq,因此如果可以的话,可以作为您的替代选择
    • 与NServiceBus相似的产品,但并非100%相同- NServiceBus与MassTransit 提供全面的比较
    • 完全开源/免费
  • 只是说
    • 专为基于SQS/SNS设计的轻量级开源消息传递框架
    • 每个事件的SNS主题,每个微服务的SQS队列,使用本机SNS SQS Queue订阅来实现扇出
    • 免费开源

可能还有其他人,并且我在NServiceBus方面拥有最丰富的个人经验,但是我强烈建议您研究现成的解决方案-它们将使您腾出时间来根据业务事件开始设计系统,而不用担心事件传递的机制.

即使您确实想构建自己的学习练习,回顾一下以上工作可能也会为您提供一些可靠的事件驱动消息传递所需的技巧.

交易一致性和发件箱模式

已对问题进行了编辑,以询问如果部分操作成功但发布操作失败会发生什么情况.我已经将其称为消息传递的事务一致性,它通常意味着在事务中,所有业务副作用都已落实,或者没有.业务副作用可能意味着:

  • 数据库记录已更新
  • 另一个数据库记录已删除
  • 邮件已发布到邮件队列
  • 电子邮件已发送

如果数据库操作失败,通常您不希望发送电子邮件或发布消息,同样,如果消息发布失败,您也不想执行数据库操作.

那么如何确保消息的一致性?

NServiceBus通过以下两种方式之一进行处理:

  1. 使用事务一致的消息传输,例如MSMQ.

    1. MSMQ能够利用发件箱模式

      1. 使用发件箱模式,不会立即发送邮件-消息将它们添加到数据库(最好是与业务数据相同的数据库)中的发件箱表中,作为同一事务的一部分
      2. 在提交事务之后,它将尝试分派每条消息,并仅在成功分派后才将其从发件箱中删除
      3. 如果在派发后但删除前系统发生故障,则将再次发送该消息.为了弥补这一点,启用发件箱后,NServiceBus还将通过保留所有入站消息的记录并丢弃重复项来对入站消息进行重复数据删除.
      4. 重复数据删除在Amazon SQS中特别有用,因为它本身最终是一致的,并且相同的消息可能会收到两次.
      5. 这与您的问题中的原始概念相去不远,但是有一些区别:

        1. 您正在构思后台定时过程,以扫描事件表(也称为发件箱表)并将事件发布到SQS
        2. NServiceBus在管道中执行处理程序-使用发件箱,将消息分发到传输(也就是将消息推送到SQS队列中)只是管道中的最后步骤之一.因此-每当处理一条消息时,在处理该事务期间生成的所有出站消息都将在提交业务交易后立即分派-无需对事件表进行定时扫描.

      6. 注意:发件箱仅在存在环境NServiceBus Handler事务时才成功-即,当您在NServiceBus管道中处理消息时.在某些情况下(例如, WebAPI请求管道.因此, NServiceBus建议使用您的API请求仅发送一条命令消息,然后将业务数据操作与后端端点服务中事务一致的命令处理程序中的其他消息传递结合在一起.尽管他们文档中的第3点与MSMQ的关系比与SQS传输的关系更重要.

    处理程序语义

    关于提案的另一条评论-按照惯例,UserChangedEmailHandler通常会与响应电子邮件更改而执行某些操作的服务相关联,而不是简单地参与电子邮件更改信息的传播.当您的系统发布了50个事件时,您是否要50个不同的处理程序仅将这些消息推送到不同的队列中?

    上述系统使用通用框架通过传输方式传播消息,因此您可以为预订系统保留UserChangedEmailHandler,并在其中包含用户更改电子邮件时应发生的业务逻辑.

    I have started to work with micro-services and I need to create an event publishing mechanism.

    I plan to use Amazon SQS.

    The idea is quite simple. I store events in the database in the same transaction as aggregates. If user would change his email, event UserChangedEmail will be stored in the database.

    I also have event handler, such as UserChangedEmailHandler, which will (in this case) be responsible to publish this event to SQS queue, so other services can know that user changed email.

    My question is, what is the practice to achieve this? Should I have some kind of background timed process which will scan events table and publish events to SQS? Can this be process within WebApi application (preferable), or should this be a separate a process?

    One of the ideas was to use Hangfire, but it does not support cron jobs under a minute.

    Any suggestions?

    EDIT:

    As suggested in the one of the answers, I've looked in to NServicebus. One of the examples on the NServiceBus page shows core of my concern.

    In their example, they create a log that order has been placed. What if log or database entry is successfully commited, but publish breaks and event never gets published?

    Here's the code for the event handler:

    public class PlaceOrderHandler :
        IHandleMessages<PlaceOrder>
    {
        static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
        IBus bus;
    
        public PlaceOrderHandler(IBus bus)
        {
            this.bus = bus;
        }
    
        public void Handle(PlaceOrder message)
        {
            log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
            log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");
    
            var orderPlaced = new OrderPlaced
            {
                OrderId = message.Id
            };
            bus.Publish(orderPlaced); <!-- my concern
        }
    }
    

    解决方案

    Off the Shelf Suggestions

    Rather than rolling your own, I recommend looking into off the shelf products, as there is a lot of complexity here that will not be apparent out the outset, e.g.

    • Managing event subscriber list - an SQS queue is more appropriately paired with an event consumer, rather than with an event producer as when a message is consumed it is no longer available on the queue - so if you want to support multiple subscribers for a given event (which is a massive benefit of event driven architectures), how do you know which SQS queues you push the event message onto when it is first raised?
    • Retry semantics, error forwarding queues - handling temporary errors due to ephemeral infrastructure issues vs permanent errors due to business logic semantic issues
    • Audit trails of which messages were raised when and sent where
    • Security of messages sent via SQS (does your business case require them to be encrypted? SQS is an application service offered by Amazon which doesn't provide storage level encryption
    • Size of messages - SQS has a message size limit so you may eventually need to handle out-of-band transmission of large messages

    And that's just off the top of my head...

    A few off the shelf systems that would assist:

    • NServiceBus provides a framework for managing command and event messaging, and it has a plugin framework permitting flexible transport types - NServiceBus.SQS offers SQS as a transport.
      • Offers comprehensive and flexible retry, audit and error handling
      • Opinionated use of commands vs events (command messages say "Do this" and are sent to a single service for processing, event messages say "Something happened" and are sent to an arbitrary number of flexible subscribers)
      • Outbox pattern provides transactionally consistent messaging even with non-transactionally consistent transports, such as SQS
      • Currently the SQS plugin uses default NServiceBus subscriber persistence, which requires an SQL Server for storing the event subscriber list (see below for an option that leverages SNS)
      • Built in support for sagas, offering a framework to ensure multi transaction eventual consistency with rollback via compensating actions
      • Timeouts supporting scheduled message handling
      • Commercial offering, so not free, but many plugins/extensions are open source
    • Mass Transit
      • Doesn't support SQS off the shelf, but does support Azure Service Bus and RabbitMq, so could be an alternative for you if that is an option
      • Similar offering to NServiceBus, but not 100% the same - NServiceBus vs MassTransit offers a comprehensive comparison
      • Fully open source/free
    • Just Saying
      • A light-weight open source messaging framework designed specifically for SQS/SNS based
      • SNS topic per event, SQS queue per microservice, use native SNS SQS Queue subcription to achieve fanout
      • Open Source Free

    There may be others, and I've most personal experience with NServiceBus, but I strongly recommend looking into the off the shelf solutions - they will free you up to start designing your system in terms of business events, rather than worrying about the mechanics of event transmission.

    Even if you do want to build your own as a learning exercise, reviewing how the above work may give you some tips on what's needed for reliable event driven messaging.

    Transactional Consistency and the Outbox Pattern

    The question has been edited to ask about the what happens if parts of the operation succeed, but the publish operation fails. I've seen this referred to as the transactional consistency of the messaging, and it generally means that within a transaction, all business side-effects are committed, or none. Business side effects may mean:

    • Database record updated
    • Another database record deleted
    • Message published to a message queue
    • Email sent

    You generally don't want an email sent or a message published, if the database operation failed, and likewise, you don't want the database operation committed if the message publish failed.

    So how to ensure consistency of messaging?

    NServiceBus handles this in one of two ways:

    1. Use a transactionally consistent message transport, such as MSMQ.

      1. MSMQ is able to make use of Microsoft's DTC (Distributed Transaction Coordinator) and DTC can enroll the publishing of messages in a distributed transaction with SQL server updates - this means that if your business transaction fails, your publish operation will be rolled back and visa versa

    2. The Outbox Pattern

      1. With the outbox pattern, messages are not dispatched immediately - they are added to an Outbox table in a database, ideally the same database as your business data, as part of the same transaction
      2. AFTER the transaction is committed, it attempts to dispatch each message, and only removes it from the outbox on successful dispatch
      3. In the event of a failure of the system after dispatch but before delete, the message will be transmitted a second time. To compensate for this, when Outbox is enabled, NServiceBus will also do de-duplication of inbound messages, by maintaining a record of all inbound messages and discarding duplicates.
      4. De-duplication is especially useful with Amazon SQS, as it is itself eventually consistent, and the same messages may be received twice.
      5. This is the not far from the original concept in your question, but there are differences:

        1. You were concepting a background timed process to scan the events table (aka Outbox table) and publish events to SQS
        2. NServiceBus executes handlers within a pipeline - with Outbox, the dispatch of messages to the transport (aka pushing messages into an SQS queue) is simply one of the last steps in the pipeline. So - whenever a message is handled, any outbound messages generated during the handling will be dispatched immediately after the business transaction is committed - no need for a timed scan of the events table.

      6. Note: Outbox is only successful when there is an ambient NServiceBus Handler transaction - i.e. when you are handling a message within the NServiceBus pipeline. This will NOT be the case in some contexts, e.g. a WebAPI Request pipeline. For this reason, NServiceBus recommends using your API request to send a single Command message only, and then combining business data operations with further messaging within a transactionally consistent command handler in a backend endpoint service. Although point 3 in their doc is more relevant to the MSMQ than SQS transport.

    Handler Semantics

    One more comment about your proposal - by convention, UserChangedEmailHandler would more commonly be associated with the service that does something in response to the email being changed, rather than simply participating in the propagation of the information that the email has changed. When you have 50 events being published by your system, do you want 50 different handlers just to push those messages onto different queues?

    The systems above use a generic framework to propagate messages via the transport, so you can reserve UserChangedEmailHandler for the subscribing system and include in it the business logic that should happen whenever a user changes their email.

    这篇关于ASP.NET Web Api的事件发布者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆