MassTransit封顶消息速率为10 [英] MassTransit capping message rates at 10

查看:75
本文介绍了MassTransit封顶消息速率为10的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设置了与RabbitMQ一起使用的大众运输消费者服务,但我不知道如何提高消费者的速度-似乎硬性限制每秒接收10条消息.

I have a mass transit consumer service set up to work with RabbitMQ and I can't figure out how to increase the speed of the consumer - it seems to hard cap at 10 messages received per second.

我尝试了此处列出的步骤: https://groups.google.com/forum/#!可以提高邮件的下载速度.

I have tried the steps listed here: https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ, with no success - setting the prefetch and the concurrent consumers to 25 does nothing other than increasing the acknowledged messages, but it doesn't increase the rate at which the messages are downloaded.

我的配置如下:

ServiceBusFactory.ConfigureDefaultSettings(x =>
    {
        x.SetConcurrentReceiverLimit(25);
        x.SetConcurrentConsumerLimit(25);
    });

_bus = ServiceBusFactory.New(
    sbc =>
        {
            sbc.UseRabbitMq(x => 
                x.ConfigureHost(
                    "rabbitmq://localhost/Dev/consume?prefetch=25",
                    y =>
                        {
                            y.SetUsername(config.Username);
                            y.SetPassword(config.Password);
                        }));
            sbc.UseLog4Net();
            sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25");
            sbc.Subscribe(x => RegisterConsumers(x, container));
            sbc.UseJsonSerializer();
            sbc.SetConcurrentConsumerLimit(25);
        });

我在两个地方设置并发使用者限制,因为不确定是否需要在默认设置或总线配置中设置它,并且使用者通过统一注册-我省略了使用者订阅,因为所有订阅者都在接收.

I'm setting the concurrent consumer limit in two places as I'm not sure whether I need to set it on the default or in the bus configuration, and the consumers are registered via unity - I have omitted the consumer subscription as all subscribers are receiving.

对于需要设置其他内容还是需要更改设置顺序,我有些困惑.

I'm a little confused as to whether there's anything else I need to set or if I need to change the order in which I'm setting the configs.

任何帮助都将不胜感激.

Any help greatly appreciated.

推荐答案

度过了一个浪漫的夜晚,讨论了克里斯建议的其他事情之后,我发现还有还有另一个 >您必须要做的事情才能使其正常工作.

After spending a romantic evening with the problem and trying out different things suggested by Chris, I've found out that there is yet another thing you have to do to make it work like it should.

具体来说,,您需要在使用者队列地址上设置预取:

Specifically, yes, you need to set the prefetch on the consumer queue address:

sbc.UseRabbitMq(
                f =>
                    f.ConfigureHost(
                        new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
                        c =>
                        {
                        } )
                );

int pf = 20; // prefetch

// set consumer prefetch (required!)
sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );

但这还不够.

该键可在mtstress工具Chris的答案下方的注释中提及.原来是工具调用:

The key is available in the code of the mtstress tool Chris mention in his comment below his answer. It turns out the tool calls:

int _t, _ct;
ThreadPool.GetMinThreads( out _t, out _ct );
ThreadPool.SetMinThreads( pf, _ct );

将此代码添加到我的代码中即可解决此问题.我不知道为什么MSMQ传输不需要这样做,但是...

Adding this to my code resolves the issue. I wonder though why this is not required with MSMQ transport, though...

更新#1

进一步调查后,我发现了一个可能的罪魁祸首.在ServiceBusBuilderImpl中.

After further investigation I've found a possible culprit. It's in the ServiceBusBuilderImpl.

有一种提高上限的方法,ConfigureThreadPool.

There is a method to raise the limit, the ConfigureThreadPool.

这里的问题是它调用CalculateRequiredThreads,它应该返回所需线程的数量.不幸的是,后者在我的客户端Windows 7和Windows Server上都返回值.因此,ConfigureThreadPool实际上没有任何作用,因为调用ThreadPool.SetMin/MaxThreads时会忽略负值.

The problem here is that it calls CalculateRequiredThreads which should return the number of required threads. Unfortunately the latter returns a negative value on both my client Windows 7 and my Windows Server. Thus, the ConfigureThreadPool effectively does nothing as the negative value is then ignored when calling ThreadPool.SetMin/MaxThreads.

这个负值呢?似乎CalculateRequiredThreads调用ThreadPool.GetMinThreadsThreadPool.GetAvailableThreads并使用公式得出所需线程的数量:

What about this negative value? It seems the CalculateRequiredThreads calls ThreadPool.GetMinThreads and ThreadPool.GetAvailableThreads and uses a formula to came up with the number of required threads:

var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);

这里的问题是,在我的机器上,这确实有效:

The problem here is that on my machines this effectively does:

40 (my limit) + 8 (workerThreads) - 1023 (availableThreads) 

哪个当然会返回

-975

结论是:上面来自公共交通内部的代码似乎是错误的.当我手动手动提高该限制时,ConfigureMinThreads会遵守该限制(因为它仅在它大于读取值时才设置该限制).

The conclusion is: the above code from the Mass Transit internals seems to be wrong. When I manually raise the limit in advance, the ConfigureMinThreads respects it (as it sets the limit only if it is higher than the read value).

没有预先手动设置限制,则无法设置该限制,因此代码执行的线程数与默认线程池限制(在我的计算机上为8)一样多.

Without setting the limit manually in advance, the limit fails to be set and thus the code does as much threads as the default thread pool limit (which seems to be 8 on my machine).

显然有人认为这个公式会产生结果

Apparently someone assumed this formula will yield

40 + 8 - 8

在默认情况下.为什么GetMinThreadsGetAvailableThreads返回这样的不相关值尚待确定...

in a default scenario. Why GetMinThreads and GetAvailableThreads return such unrelated values is yet to be determined...

更新#2

更改

    static int CalculateRequiredThreads( int consumerThreads )
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads );
        int availableWorkerThreads;
        int availableCompletionPortThreads;
        ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
        var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
        return requiredThreads;
    }

    static int CalculateRequiredThreads( int consumerThreads )
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads );
        int availableWorkerThreads;
        int availableCompletionPortThreads;
        ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
        var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
        return requiredThreads;
    }

解决了问题.两者都返回1023,并且公式的输出是正确数量的预期线程.

resolves the issue. Both return 1023 here and the output of the formula is a correct number of expected threads.

这篇关于MassTransit封顶消息速率为10的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆