并发处理Rabbitmq消息 [英] Handle rabbitmq messages concurrenrtly

查看:463
本文介绍了并发处理Rabbitmq消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在此处询问了为什么开始使用Thread.Run的进程未执行预期的并发请求.

I asked a question here about why starting a process using Thread.Run did not execute as many concurrent requests as I expected.

此问题背后的原因是我试图创建一个类,该类可以将消息从Rabbitmq队列中拉出并并发地处理它们,直到并发消息的最大数量为止.

The reason behind this question was that I was trying to create a class which can pull messages off a rabbitmq queue and process them concurrently up to a maximum number of concurrent messages.

为此,我在EventingBasicConsumer类的Received处理程序中得到了以下内容.

To do this I ended up with the following in the Received handler of the EventingBasicConsumer class.

async void Handle(EventArgs e) 
{
    await _semaphore.WaitAsync();

    var thread = new Thread(() =>
    {
        Process(e);
        _semaphore.Release(); 
        _channel.BasicAck(....);
    });
    thread.Start();
} 

但是,除非进行CPU绑定工作,否则上一篇文章中的评论都不会启动线程.

However the comments on the previous post were not to start a thread unless doing CPU bound work.

上面的处理程序不知道工作是否受CPU约束,网络,磁盘或其他. (Process是抽象方法).

The above handler does not know whether the work will be CPU bound, Network, Disk or otherwise. (Process is an abstract method).

即使如此,我想我也必须在这里启动线程或任务,否则Process方法会阻塞Rabbitmq线程,直到完成后才再次调用事件处理程序.因此,我一次只能处理一种方法.

Even so I think I have to start a thread or task here, otherwise the Process method blocks the rabbitmq thread and the event handler is not called again until it is finished. So I can only handle one method at once.

在这里开始新的Thread可以吗?最初,我使用Task.Run,但是这并没有产生想要的那么多工人.参见其他帖子.

Is starting a new Thread here okay? Originally I had used Task.Run but this didn't produce as many workers as wanted. See other post.

仅供参考.通过在信号量上设置InitialCount可以限制并发线程数.

FYI. The number of concurrent threads is capped by setting the InitialCount on the semaphore.

推荐答案

正如链接问题中已经提到的那样,大量线程并不能保证性能,就好像它们的数量超过了逻辑核心的数量一样,您没有真正完成工作的情况线程饥饿.

As already been said in linked question, big number of threads doesn't guarantee the performance, as if their number gets more than the number of logical cores, you got a thread starvation situation with no real work being done.

但是,如果仍然需要处理并发操作数,则可以尝试使用设置

However, if you still need to handle the number of a concurrent operations, you may give a try to the TPL Dataflow library, with settings up the MaxDegreeOfParallelism, like in this tutorial.

var workerBlock = new ActionBlock<EventArgs>(
    // Process event
    e => Process(e),
    // Specify a maximum degree of parallelism.
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = InitialCount
    });
var bufferBlock = new BufferBlock();
// link the blocks for automatically propagading the messages
bufferBlock.LinkTo(workerBlock);

// asynchronously send the message
await bufferBlock.SendAsync(...);
// synchronously send the message
bufferBlock.Post(...);

BufferBlock是一个队列,因此将保留消息的顺序.另外,您可以通过将块与过滤器lambda链接来添加不同的处理程序(具有不同的并行度):

BufferBlock is a queue, so the order of messages will be preserved. Also, you can add the different handlers (with a different degree of parallelism) with linking the blocks with filter lambda:

bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs);
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs);
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs);

,但是在这种情况下,您应该在链的末尾设置默认处理程序,这样消息就不会消失(您可以使用

but in this case you should setup a default handler at the end of the chain, so the message wouldn't disappear (you may use a NullTarget block for this):

bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>);

此外,该块可能是观察者,因此它们可以与位于用户界面一侧.

Also, the block could be an observers, so they perfectly work with Reactive Extensions on UI side.

这篇关于并发处理Rabbitmq消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆