NServiceBus - 如何为接收者订阅的每个消息类型获取单独的队列? [英] NServiceBus - How to get separate queue for each message type receiver subscribes to?

查看:51
本文介绍了NServiceBus - 如何为接收者订阅的每个消息类型获取单独的队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下情况:

因此,接收者订阅了两种事件:eventA 和 eventB.NServiceBus 为接收者(Receiver)创建队列,并将 eventA 和 eventB 类型的消息放入同一个队列.问题是,我是否可以将 NServiceBus 配置为对每种类型的接收器事件使用单独的队列(ReceiverEventA 和 ReceiverEventB)?或者我可以在单个进程中有两个接收器(每个接收器单独的队列).事实是,EventA 的处理时间比 EventB 长得多,而且它们是独立的——所以如果它们在不同的队列中,它们可以被并发处理.

So, receiver subscribes to two kind of events: eventA and eventB. NServiceBus creates queue for receiver (Receiver) and places messages of type eventA and eventB to to same queue. Question is, if I can configure NServiceBus to use separate queues (ReceiverEventA and ReceiverEventB) for each type of event for receiver? Or can I have two receivers in single process (and each receiver separate queue). Thing is, that EventA takes much longer to process than EventB, and they are independent - so if they would be in separate queues, they could be processed concurrently.

更新:如果我采用这样的幼稚方法,接收器将无法以空引用异常启动:

Update: If i'm going with naive approach like this, receiver fails to start with null reference exception:

 private static IBus GetBus<THandler, TEvent>()
    {                   
        var bus = Configure.With(new List<Type>
                                     {
                                         typeof(THandler), 
                                         typeof(TEvent),
                                         typeof(CompletionMessage)
                                     })
            .Log4Net()
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
            .IsTransactional(true)
            .PurgeOnStartup(false)
            .UnicastBus()
            .LoadMessageHandlers()
            .ImpersonateSender(false);

        bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);

        return bus.CreateBus().Start();
    }

    [STAThread]
    static void Main()
    {
        Busses = new List<IBus>
                     {
                         GetBus<ItemEventHandlerA, ItemEventA>(),
                         GetBus<ItemEventHandlerB, ItemEventB>()
                     };          

        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);
        Application.Run(new TestForm());
    }

异常堆栈跟踪是:

at NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler,TEvent in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 57
在 NServiceBusTest2.WinFormsReceiver.Program.Main() 在 C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 26
在 System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
在 System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)在 Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
在 System.Threading.ThreadHelper.ThreadStart_Context(对象状态)
在 System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
在 System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
在 System.Threading.ThreadHelper.ThreadStart()

at NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler,TEvent in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 57
at NServiceBusTest2.WinFormsReceiver.Program.Main() in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 26
at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args) at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()

推荐答案

为了支持我对@stephenl 发布的答案的评论,我开始写一篇更详尽的解释,说明为什么不应该有两个单独的进程.NServiceBus 基本上为每个进程强制执行一个输入队列.

I started writing a more drawn-out explanation of why you should NOT have two separate processes, in support of my comment to the answer @stephenl posted. NServiceBus basically enforces one input queue per process.

所以通常情况下,您会有两个独立的进程.EventAService 将从 QForEventA 读取 EventA,在与 EventBService 不同的进程中,后者将从 QForEventB 读取 EventB.

So normally, you would have two separate processes. EventAService would read EventA from QForEventA, in a separate process from EventBService which would read EventB from QForEventB.

然后我更仔细地查看了您的示例代码,并意识到您使用的是 Windows 窗体应用程序.呸!现在我觉得有点傻.当然你只能有一个进程.想象一下,如果在启动 Outlook 后,您还必须启动 MailService.exe 才能真正接收邮件!

Then I looked more carefully at your example code and realized you were in a Windows Forms app. Duh! Now I feel a bit foolish. Of course you can only have one process. Imagine if after launching Outlook you also had to launch MailService.exe to actually get mail!

所以问题实际上是您在 Windows 窗体应用程序中对 EventA 和 EventB 的处理时间截然不同.我不知道那是什么工作,但这对于客户端应用程序来说有点奇怪.

So the problem is really that you have drastically different processing times for EventA and EventB within your Windows Forms app. I don't have any way of knowing what that work is, but this is a little bit odd for a client application.

大多数时候它是一个需要大量处理的服务,并且客户端接收的任何消息都相当轻量级 - 类似于实体 X 已更改,因此下次您需要直接从数据库"并处理它只涉及从缓存中删除某些内容 - 当然不是一个长时间运行的过程.

Most of the time it's a service that has big processing to do, and any message received by a client is fairly lightweight - something along the lines of "Entity X has changed so next time you'll need to load it direct from the database" and processing it involves just dropping something out of cache - certainly not a long-running process.

但听起来无论出于何种原因,您的客户端中的处理需要更长的时间,这在 WinForms 应用程序中令人担忧,因为担心 UI 线程编组、阻塞 UI 线程等.

But it sounds like for whatever reason the processing in YOUR client takes longer, which in a WinForms app is concerning because of concerns about UI thread marshalling, blocking the UI thread, etc.

我建议您不要在 WinForms 应用程序的 NServiceBus 处理程序中进行所有处理,而是将其编组到其他地方.将它作为工作项或类似的东西扔到 ThreadPool 中.或者将长时间运行的项目放入队列中,并以自己的速度在后台线程处理这些项目.这样 NServiceBus 消息处理程序所做的就是是的,收到消息.非常感谢."那么一次处理一个 NServiceBus 消息应该无关紧要.

I would suggest a system where you don't do all the processing in the NServiceBus handler in your WinForms app, but marshal it off somewhere else. Throw it over to the ThreadPool as a work item, or something like that. Or put the long-running items into a Queue and have a background thread crunch on those at its own speed. That way all the NServiceBus message handler does is "Yep, got the message. Thank you very much." Then it shouldn't really matter if the NServiceBus messages are processed one at a time.

更新:

在评论中,OP 询问如果您在 NServiceBus finsihes 接收工作后将工作扔到 ThreadPool 会发生什么.那当然是这种方法的另一面——在 NServiceBus 完成后,你是自己的——如果它在 ThreadPool 中失败,那么由你来创建自己的重试逻辑,或者只是捕获异常,提醒 WinForms 应用的用户,让它死掉.

In the comments, the OP asks what happens if you throw the work to the ThreadPool after NServiceBus finsihes receiving it. That is, of course, the flip side to this approach - after NServiceBus is done, you're on your own - if it fails in the ThreadPool, then it's up to you to create your own retry logic, or just catch the exception, alert the user of the WinForms app, and let it die.

显然这是最佳的,但它引出了一个问题 - 您到底想在 WinForms 应用程序中完成什么样的工作?如果 NServiceBus 提供的健壮性(有害消息的自动重试和错误队列)是这个难题的关键部分,那么为什么它首先会在 WinForms 应用程序中发生?此逻辑可能需要卸载到 WinForms 应用程序外部的服务,其中为每种消息类型(通过部署单独的服务)设置单独的队列变得容易,然后只有影响 UI 的部分才会被发送回 WinForms 客户端.当发送到 UI 的消息只影响 UI 时,处理它们几乎总是微不足道的,并且您不需要卸载到 ThreadPool 来跟上.

Obviously that's optimal, but it begs the question - just exactly what sort of work are you trying to accomplish in the WinForms app? If the robustness that NServiceBus offers (the automatic retries and error queue for poison messages) is a critical piece of this puzzle, then why is it going on in a WinForms application in the first place? This logic probably needs to be offloaded to a service external to the WinForms application, where having separate queues for each message type (by deploying separate services) becomes easy, and then only the pieces affecting the UI are ever sent back to the WinForms client. When the messages to the UI only affect the UI, processing them is almost always trivial, and you won't have any need for offloading to the ThreadPool to keep up.

直接说你在GitHub问题中描述的情况,这确实听起来就像每种消息类型的单独进程正是规定的解决方案的那种场景.我听说部署和管理这么多流程听起来很困难,但我认为您会发现它并不像听起来那么糟糕.甚至还有优势 - 例如,如果您必须在 Amazon.com 上重新部署您的连接器,您只需重新部署那个端点,其他任何一个都不会停机,或者担心错误可能会被引入其他人.

Speaking directly to the situation you described in the GitHub Issue, this really does sound like the kind of scenario where separate processes for each message type are exactly the prescribed solution. I hear that it sounds overwhelming to deploy and manage that many processes, but I think you'll find that it's not as bad as it sounds. There are even advantages - if you have to redeploy your connector with Amazon.com, for instance, you only have to redeploy THAT endpoint, with no downtime for any of the others, or any worries that bugs may have been introduced to the others.

为了简化部署,希望您使用的是持续集成服务器,然后检查诸如 DropkicK 之类的工具 有助于编写脚本的部署.就个人而言,我最喜欢的部署工具是老式的 Robocopy.甚至像 1) NET STOP ServiceName、2) ROBOCOPY、3) NET START ServiceName 这样简单的东西也非常有效.

To ease deployment, hopefully you're using a continuous integration server, and then check in to tools like DropkicK that help deployments to be scripted. Personally, my favorite deployment tool is good old Robocopy. Even something as simple as 1) NET STOP ServiceName, 2) ROBOCOPY, 3) NET START ServiceName is quite effective.

这篇关于NServiceBus - 如何为接收者订阅的每个消息类型获取单独的队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆