Azure服务总线-在OnMessageAsync中读取的消息到队列的末尾 [英] Azure Service Bus - Readd message in OnMessageAsync to the end of the queue

查看:172
本文介绍了Azure服务总线-在OnMessageAsync中读取的消息到队列的末尾的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Microsoft Azure服务总线发送命令消息,并且正在使用OnMessage方法从总线中获取命令消息.在我们实现IMessageSessionAsyncHandler.OnMessageAsync的内部,可能是我们意识到一条消息尚未准备好进行处理.因此,我想从总线上获取消息并将其读取到队列末尾.获取/读取操作必须是原子的.我该如何实现?

We are using the Microsoft Azure Service Bus to send command messages and we are using the OnMessage approach to get them from the bus. Inside of our implementation of IMessageSessionAsyncHandler.OnMessageAsync, it can be that we realize that a message is not ready for processing. So I want to get the message from the bus and readd them to the end of the queue . The get/readd operations must be atomic. How can I achieve this?

那是我当前的解决方案(高度抽象),但我担心它不是原子性的.

Thats my current solution (highly abstracted) but I am afraid about the non-atomicity.

queueClient.RegisterSessionHandlerFactory(new CommandSessionHandlerFactory(queueClient), ...);

internal class CommandSessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
    private readonly QueueClient _queueClient;
    public CommandSessionHandlerFactory(QueueClient queueClient)
    {
        _queueClient = queueClient;
    }

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
    {
        return new CommandSessionHandlerAsync(_queueClient);
    }
}

internal class CommandSessionHandlerAsync : MessageSessionAsyncHandler
{
    private readonly QueueClient _queueClient;
    public CommandSessionHandlerAsync(QueueClient queueClient)
    {
        _queueClient = queueClient;
    }

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
    {
        if (!messageReadyForProcessing)
        {
            // How to get the following code transactional safe?
            var clonedMessage = message.Clone();
            await message.CompleteAsync();
            await _queueClient.SendAsync(clonedMessage);
        }
    }
}

如何进行双重检测?我们是否必须更改克隆消息的MessageId以确保服务总线重复检测不会删除​​克隆消息?

And what about dublicate detection? Do we have to change the MessageId of the cloned message to be sure that the service bus duplicate detection does not drop the cloned message?

推荐答案

如何进行双重检测?我们是否需要更改MessageId 克隆的消息,以确保服务总线重复 检测不会删除​​克隆的消息?

And what about dublicate detection? Do we have to change the MessageId of the cloned message to be sure that the service bus duplicate detection does not drop the cloned message?

如果启用了重复检测.您的消息将从队列中删除.因为您使用相同的MessageId且重复检测在给定时间段内(默认10分钟)跟踪messageId.

If duplicate detection is enabled. Your message will be removed from queue. Because you are using same MessageId and duplicate detection tracks messageId in giving period time (10 mins default).

      if (!messageReadyForProcessing)
        {
            // How to get the following code transactional safe?
            var clonedMessage = message.Clone();
            await message.CompleteAsync();
            await _queueClient.SendAsync(clonedMessage);
        }

您可以更改为此:

         if (!messageReadyForProcessing)
            {
                await message.CompleteAsync();
                message.MessageId = Guid.NewGuid().ToString();
                await _queueClient.SendAsync(message);
            }

但是仍然存在问题.如果消息成功完成,但发送消息失败,该怎么办?您将丢失消息.重试策略可以解决此问题.但是,它很脏,不能100%保证.

But still there is a problem. What If message completes successfully, but it fails on sending message ? You will lose message. Retry policy can solve this. However it's dirty and not 100% guarantee.

您可以增加Max DeliveryCount.除了再次发送消息外,只需放弃(释放)消息即可再次使用.

You can increase your Max DeliveryCount. Beside of sending message again, just abandon(release) it to use again.

        if (!messageReadyForProcessing)
            {
                await message.AbandonAsyncy();

            }

这更好.您将确定,这是事务性的.即使超过最大交付数,也会进入死队列.

This is better. You will be sure, it's transactional. Even if it exceeds max delivery count, it will go to dead queue.

但是仍然有一个缺点.真正的定向消息将如何处理?因此,如果您有一条永远不会被处理的消息,它将吸引您的消费者.因为我们增加了最大投放次数.

But there is still a drawback. What will happen to real posined messages ? So if you have a message that never will be processed, it will posion your consumers. Because we have increased max delivery count.

简而言之:OnMessage不适用于您的解决方案.当您准备好处理消息时,只需从队列中获取消息即可.我认为这是最适合您的解决方案.

In short: OnMessage is not good for your solution. Just get message from queue, when you are ready to process it. I think this is the most suitable solution for you.

   if (messageReadyForProcessing)
     {
        var mySession=QueueClient.AcceptMessageSession();
             var message = mySession.Receive();
     }

您可以在服务总线上使用 TransactionScope .您应该在作用域中的同一队列上工作.

You can use TransactionScope with service bus. You should work on same queue in scope.

Ps:交易范围不支持放弃.

Ps: transaction scope does not support abandon.

因此您可以应用此:

    if (!messageReadyForProcessing)
      {
         using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            { 
                    await message.CompleteAsync();
                    message.MessageId = Guid.NewGuid().ToString();
                    await _queueClient.SendAsync(message);
          scope.Complete();                
      }
     }

检查代理消息:事务

我不确定这是否适用于异步.因此,您可以测试并检查与Service Bus进行异步事务,如果操作不成功的话

I'm not sure this will work with asyncs. So you can test and check Asynchronous Transactions with Service Bus, if it not success.

Ps:他们说网络框架上有一个关于transactionscope和asyncs的错误.他们建议您使用4.5.1或更高版本.在此处查看更多信息.

Ps: They say there was a bug on net framework about transactionscope and asyncs. They suggest you to use 4.5.1 or higher. Check here fore more.

这篇关于Azure服务总线-在OnMessageAsync中读取的消息到队列的末尾的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆