MassTransit:消费者使用完所有消息后,如何停止总线? [英] MassTransit: How to stop the bus after all messages are consumed by consumer?

查看:190
本文介绍了MassTransit:消费者使用完所有消息后,如何停止总线?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我使用MassTransit消耗了队列中的所有消息之后,我尝试停止总线.我将并发消息限制设置为1,因为我的使用者需要一次处理一条消息.

I attempt to stop the bus after all the messages in queue consumed using MassTransit. And I set the concurrent Message limit to 1 as my consumer needs to process one message at a time.

我尝试将bus.StopAsync()放在bus.StartAsync后面,如下所示.结果表明,在消耗了一条消息之后,总线将停止.

I've tried putting the bus.StopAsync() behind bus.StartAsync like below. And the result showed that after one message being consumed, the bus will stop.

总线配置:

IBusControl bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost host = cfg.Host(new Uri("rabbitmq://localhost"), hostConfigurator =>
                {
                    hostConfigurator.Username("username");
                    hostConfigurator.Password("password");
                });

            cfg.ReceiveEndpoint(host, "MyResult", ep =>
            {
                ep.Bind("MyExchange", s => { s.Durable = true; });

                ep.Consumer<MessageConsumer>(mc =>
                {
                    mc.UseConcurrentMessageLimit(1);
                });
            });
        });

公交启停:

await bus.StartAsync();

await bus.StopAsync();

我的问题是队列中的所有消息都用完后如何停止总线.我对MassTransit相当陌生,并且真的对调用消费者和停止公交的顺序感到好奇.感谢有人可以帮忙.谢谢.

My question is how to stop the bus after all messages in queue have been consumed. I am quite new to MassTransit, and really curious about the sequence of invoking consumers and stopping bus. Appreciate if someone could help. Thanks.

推荐答案

在Testing命名空间中,有一项功能用于监视总线上的活动,该功能可用于表示没有消息在被使用(您可以按照建议停车.)

In the Testing namespace, there is a feature used to watch for activity on the bus, which could be used to signal that there are no messages being consumed (after which, you could stop the bus as you suggest).

您可以看到单元测试: https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.Tests/BusActivityMonitor_Specs.cs#L69

You can see the unit test: https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.Tests/BusActivityMonitor_Specs.cs#L69

使用以下方式添加观察者:

The observer is added using:

var activityMonitor = bus.CreateBusActivityMonitor(TimeSpan.FromMilliseconds(500));

一旦总线空闲,超时将为true:

Once the bus is idle, the timeout will be true:

var timeout = await activityMonitor.AwaitBusInactivity(TimeSpan.FromSeconds(10));

如果timeout为true,则在指定的时间内总线上没有活动.

If timeout is true, there is no activity on the bus in the time specified.

这篇关于MassTransit:消费者使用完所有消息后,如何停止总线?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆