Azure Service Bus会话 - 可能的饥饿 [英] Azure Service Bus Sessions - possible starvation

查看:65
本文介绍了Azure Service Bus会话 - 可能的饥饿的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个后台处理器,它可以监听服务总线队列,并且每个会话ID都需要按顺序处理消息。


我想确保不再需要并行处理超过5条消息 - 这意味着我想一次处理5个会话。


如果在任何给定时间有超过5个会话,我有什么办法可以保证所有会话最终都会被处理?


即使我删除某个会话然后接受新会话,也可能会再次处理同一会话。 我可以确保接受的会话是包含最旧的未处理消息的会话吗?

解决方案

您可以在  Message
sessions:first in,first out(FIFO)


一条新消息只有在先前的消息已经完成或死字母时才能获得。放弃邮件会导致同一邮件在下一次接收操作时再次投放



关于并行处理,你可以有一个线程池对于处理,每次收到消息时,您只需抓取其中一个池并为其分配消息。您需要管理该池。



或者,您可以一次检索多条消息并使用TPL处理它们...例如,方法  BeginReceiveBatch / EndReceiveBatch  允许
你检索多个"项"从队列(Async)然后使用"AsParallel"转换前面方法返回的IEnumerable并处理多个线程中的消息。




 var messages = await Task.Factory.FromAsync< IEnumerable< BrokeredMessage>>(Client.BeginReceiveBatch(3,null,null),Client.EndReceiveBatch); 

messages.AsParallel()。WithDegreeOfParallelism(3).ForAll(item =>
{
ProcessMessage(item);
});

该代码检索3条消息从队列和进程然后在"3个线程"中(注意:不保证它将使用3个线程,.NET将分析系统资源
,如果需要,它将最多使用3个线程)




I have a background processor that listens to a service bus queue and messages need to be processed in order per Session ID.

I'd like to ensure that no more than 5 messages are processed in parallel - meaning I'd like to process 5 sessions at once.

If there are more than 5 sessions at any given time, is there any way I can ensure that all sessions are eventually handled?

Even if I drop a certain session and then accept a new session after that, the same session might be processed again.  Can I ensure that the accepted session is the one that contains the oldest unprocessed message?

解决方案

You can find the answer in Message sessions: first in, first out (FIFO)

A new message can only be obtained when the prior message has been completed or dead-lettered. Abandoning a message causes the same message to be served again with the next receive operation.

About parallel processing, you could have a pool of threads for processing, and every time you get a message, you just grab one of that pool and assign it a message. You need to manage that pool.

OR, you could retrieve multiple messages at once and process them using TPL... for example, the method BeginReceiveBatch/EndReceiveBatch allows you to retrieve multiple "items" from Queue (Async) and then use "AsParallel" to convert the IEnumerable returned by the previous methods and process the messages in multiple threads.


var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);

messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
    ProcessMessage(item);
});

That code retrieves 3 messages from queue and processes then in "3 threads" (Note: it is not guaranteed that it will use 3 threads, .NET will analyze the system resources and it will use up to 3 threads if necessary)


这篇关于Azure Service Bus会话 - 可能的饥饿的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆