重用:2个进程中的2个处理程序。前后交替打 [英] Rebus: 2 handlers in 2 processes. Hit inconsistently and alternately

查看:103
本文介绍了重用:2个进程中的2个处理程序。前后交替打的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个使用Rebus的控制台应用程序。它们都引用定义消息(命令和事件)的程序集。
控制台应用程序 A发送命令并监听事件以进行记录(例如:发送CreateTCommand并监听TCreatedEvent)。控制台应用程序 B(实际上是ASP.NET Core应用程序)侦听命令并进行处理(例如:由CreateTCommand发起传奇,创建聚合并引发TCreatedEvent)。在应用程序 B的进程内的另一个DLL中,有一个TCreatedEvent的处理程序。

I have two console apps using Rebus. They both reference an assembly where messages (commands and events) are defined. Console app "A" sends commands and listens to events for logging purposes (e.g.: sends a CreateTCommand and listens for TCreatedEvent). Console app "B" (which is actually an ASP.NET Core app) listens to commands and handles them (e.g.: a saga is initiated by CreateTCommand, the aggregate is created and it raises a TCreatedEvent). In another DLL inside the process of app "B" there is a handler for TCreatedEvent.

因此,我有一个由应用程序 A发送的创建命令和两个处理程序创建的事件,一个在应用 A中,一个在应用 B中。

So I have a creation command sent by app "A" and two handlers for the created event, one in app "A" and one in app "B".

问题:当我第一次从应用 A发送命令时,应用 B引发创建的事件,该事件在同一过程中触发处理程序。应用程序 A中的处理程序
未触发。
来自应用程序 A的其他命令始终由应用程序 B中的传奇处理,但创建的事件再也不会在该过程中到达处理程序,而是由应用程序 A处理!
有时(我无法理解如何重现)来自应用程序 A的命令未由应用程序 B中的传奇处理(我在MSMQ的错误队列中找到了该命令,但异常情况为带有ID的消息无法分派给任何处理程序)。
有时(非常罕见)两个处理程序都被击中。但是我无法始终如一地重现该行为...

The problem: when I send a command from app "A" the first time, app "B" raises the created event which triggers the handler in the same process. The handler in app "A" is not triggered. Further commands from app "A" are always handled by the saga in app "B" but created events never again hit the handler in that process, but are handled by app "A"!!! Sometimes (I can't understand how to reproduce) commands from app "A" are not handled by the saga in app "B" (I find the command in the error queue in MSMQ with the exception "message with Id could not be dispatched to any handlers"). Sometimes (very rarely) both handlers have been hit. But I can't reproduce the behavior consistently...

我对此的感觉(对Rebus几乎一无所知,这对我来说是很新的):

My sensations about this (knowing nearly nothing about Rebus, which is quite new to me):


  • 这可能是并发问题吗?我的意思是:将Rebus配置为将订阅存储在外部流程中(使用SQL或Mongo,该问题不会消失),因此我认为第一个处理程序可能过快并将事件标记为在第二个处理程序之前已处理调用

  • 检查订阅SQL表,我发现有5行(在代码中使用我在代码中订阅的每种事件类型(在应用程序启动时使用bus.Subscribe()))同一地址(队列名称链接到我的本地计算机名称)。只有一个地址有2个进程尝试使用它,这是一个问题吗?

Rebus的配置代码在两个应用程序中相同,如下所示:

The configuration code for Rebus is the same in the 2 apps and goes like this:

        const string inputQueueAddress = "myappqueue";
        var mongoClient = new MongoClient("mongodb://localhost:27017");
        var mongoDatabase = mongoClient.GetDatabase("MyAppRebusPersistence");
        var config = Rebus.Config.Configure.With(new NetCoreServiceCollectionContainerAdapter(services))
            .Logging(l => l.Trace())
            .Routing(r => r.TypeBased()
                .MapAssemblyOf<AlertsCommandStackAssemblyMarker>(inputQueueAddress)
                .MapAssemblyOf<AlertsQueryStackAssemblyMarker>(inputQueueAddress)
            )
            .Subscriptions(s => s.StoreInMongoDb(mongoDatabase, "subscriptions"))
            .Sagas(s => s.StoreInMongoDb(mongoDatabase))
            .Timeouts(t => t.StoreInMongoDb(mongoDatabase, "timeouts"))
            .Transport(t => t.UseMsmq(inputQueueAddress));

        var bus = config.Start();
        bus.Subscribe<AlertDefinitionCreatedEvent>();
        bus.Subscribe<AlertStateAddedEvent>();
        bus.Subscribe<AlertConfigurationForEhrDefinitionAddedEvent>();
        services.AddSingleton(bus);

        services.AutoRegisterHandlersFromThisAssembly();

我希望有人能提供帮助,这让我发疯了...

I hope someone can help, this is driving me nuts...

ps:传递isCentralized时也存在问题:对subscription.StoreInMongoDb()正确。

p.s.: the problem is present also when passing isCentralized: true to subscription.StoreInMongoDb().

编辑1:我添加了控制台日志记录和您会看到这种奇怪的行为:
https://postimg.org/image/czz5lchp9/

EDIT 1: I added console logging and you can see this strange behavior: https://postimg.org/image/czz5lchp9/

第一个命令已成功发送。它由传奇处理,事件触发了控制台应用程序 A中的处理程序。
Rebus说第二条命令没有分派给任何处理程序,但实际上是由传奇处理的(我遵循调试中的代码),并且该事件由应用程序 B而不是 A中的处理程序处理。 ..为什么? ;(

The first command is sent successfully. It is handled by the saga and the event triggered the handler in console app "A". Rebus says the second command was not dispatched to any handlers but it was actually handled by the saga (I followed the code in debug) and the event was handled by the handler in app "B" and not "A"... why? ;(

编辑2:我正在调试Rebus的源代码。我注意到在ThreadPoolWorker.cs中,方法TryAsyncReceive

EDIT 2: I'm debugging Rebus's source code. I noticed that in ThreadPoolWorker.cs, method TryAsyncReceive

    async void TryAsyncReceive(CancellationToken token, IDisposable parallelOperation)
    {
        try
        {
            using (parallelOperation)
            using (var context = new DefaultTransactionContext())
            {
                var transportMessage = await ReceiveTransportMessage(token, context);

                if (transportMessage == null)
                {
                    context.Dispose();

                    // no need for another thread to rush in and discover that there is no message
                    //parallelOperation.Dispose();

                    _backoffStrategy.WaitNoMessage();
                    return;
                }

                _backoffStrategy.Reset();

                await ProcessMessage(context, transportMessage);
            }
        }

在应用程序 B发布TCreatedEvent之后,在应用程序 A中,代码​​到达
,等待ProcessMessage(context,transportMessage)
,其中transportMessage是实际事件。在应用程序 B的过程中未达到此代码行。似乎是邮件的第一个接收者将其从MSMQ的队列中删除了。正如我说的那样,我对Rebus和总线总体而言还很陌生,但是如果这种行为是按设计的,那么我就很困惑……如何在多个进程中使用多个总线来侦听同一队列?

after the TCreatedEvent is published by app "B", in app "A" the code reaches await ProcessMessage(context, transportMessage) where transportMessage is the actual event. This line of code is not reached in app "B"'s process. Seems like the first receiver of the message removes it from MSMQ's queue. As I said I'm quite new to Rebus and buses in general, but if this behavior is as designed I'm quite puzzled... how can multiple buses in multiple processes listen to the same queue???

推荐答案

除非有两个总线实例是同一终结点的多个实例,否则您永远不应有两个总线实例从同一队列接收消息。

You should never have two bus instances receiving messages from the same queue, unless they are multiple instances of the same endpoint.

当两个进程使用相同的输入队列时,它们将相互接收消息。如果您使用它实现竞争消费者模式,则绝对可以可以在一组工作进程之间平均分配工作,但是您不能在多个不同的端点之间共享输入队列。

When two processes use the same input queue, they will take messages from each other. This can be absolutely fine if you are implementing the competing consumers pattern, using it to distribute work evenly between a cluster of worker processes, but you cannot share an input queue between multiple different endpoints.

我的猜测是,如果您将所有事情都变得更加可预测让每个总线实例使用其自己的输入队列;)

My guess is that everything will seem much more predictable if you let each bus instance use its own input queue ;)


现在,您告诉我这些处理程序需要存在于主应用程序中

Now you're telling me these handlers need to live inside the main app

不,我不是告诉您:如果让两个不同的应用程序抢夺彼此的消息,您将得到无法预测的结果。

No I'm not :) I'm telling you that you will get unpredictable results if you let two different applications snatch each other's messages.

虽然所有处理程序都位于同一总线实例中(因此可以由同一队列中的消息调用)是完全可以的,但最常见的情况是您将以某种方式拆分您的应用程序帽子与您要如何开发系统相匹配。

While it's perfectly fine for all handlers to be in the same bus instance (and thus be invoked by messages from the same queue), the most common scenario is that you will split your application in a way that matches how you want to evolve the system.

通过这种方式,您可以一次更新一个应用程序,而无需进行繁琐的停止操作更新。

This way you can update a single application at a time, avoiding big "stop the world"-updates.

您可以通过启动多个端点(每个端点使用自己的队列)来实现此目的,然后在端点之间路由消息以进行通信。

You do this by starting several endpoints, each using its own queue – and then you ROUTE messages between endpoints in order to communicate.

请考虑要向命令处理器发送命令的方案。命令处理器是Rebus端点,它从 command_processor 队列获取消息。

Consider a scenario where you want to send a command to a command processor. The command processor is a Rebus endpoint that gets its messages from the command_processor queue.

在发送者端,您将配置端点映射(您可以在 Rebus Wiki上的路由部分如下所示:

In the sender's end you will configure an "endpoint mapping" (you can read more about that in the routing section on the Rebus wiki which could look like this:

Configure.With(...)
    .Transport(t => t.UseMsmq("sender"))
    .Routing(r => {
        r.TypeBased()
            .Map<TheCommand>("command_processor");
    })
    .Start();

这将使发送方可以轻松地

which will enable the sender to simply go

await bus.Send(new TheCommand(...));

然后总线将知道将命令消息传递到哪个队列。

and then the bus will know which queue to deliver the command message to.

我希望可以更清楚了:)

I hope that makes it more clear :)

请注意s是点对点消息传递的一种非常简单的情况发送一条消息,该消息将由一个其他端点使用。 Rebus还可以帮助您解决其他几种模式,例如请求/回复发布/订阅

Please note that this is a very simple case of point-to-point messaging where one endpoint sends a message which is meant to be consumed by one single other endpoint. There exists several other patterns that Rebus can help you with, e.g. request/reply and publish/subscribe.

这篇关于重用:2个进程中的2个处理程序。前后交替打的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆