完成或放弃邮件时发生Azure.Messaging.ServiceBus错误 [英] Azure.Messaging.ServiceBus Error when completing or abandoning message

查看:25
本文介绍了完成或放弃邮件时发生Azure.Messaging.ServiceBus错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

SOM我目前正在考虑将我们非常简单的Service Bus服务更新到最新版本(Asure.Messaging.Servicebus),我在这里遇到了一个较小的问题。

问题是,我希望通过将消息委托回服务类中的方法来处理作业,从而手动完成或放弃接收到的或达到峰值的消息。

这是我目前为止的简单类,由一个接口公开。

using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;

namespace myProject.Services
{
    /// <summary>
    /// Service bus service controller
    /// </summary>
    public class ServiceBusService: IServiceBusService
    {
        private IConfigurationUtility _configurationUtility;
        static string connectionString;
        static ServiceBusClient client;
        static ServiceBusSender sender;
        static ServiceBusReceiver receiver;

        public ServiceBusService(IConfigurationUtility configurationUtility)
        {
            _configurationUtility = configurationUtility;
            connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
            client = new ServiceBusClient(connectionString);
        }

        /// <summary>
        /// Sending message.
        /// </summary>
        /// <param name="messageContent"></param>
        /// <param name="queueName"></param>
        public void SendMessage(string messageContent, string queueName)
        {
            sender = client.CreateSender(queueName);
            ServiceBusMessage message = new ServiceBusMessage(messageContent);
            sender.SendMessageAsync(message).Wait();
        }

        /// <summary>
        /// Receive message.
        /// </summary>
        /// <returns></returns>
        /// <param name="queueName"></param>
        public ServiceBusReceivedMessage ReceiveMessage(string queueName)
        {

            receiver = client.CreateReceiver(queueName);
            ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
            return receivedMessage;
        }

        /// <summary>
        /// Peek message.
        /// </summary>
        /// <returns></returns>
        public ServiceBusReceivedMessage PeekMessage(string queueName)
        {
            receiver = client.CreateReceiver(queueName);
            ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
            return peekedMessage;
        }

        /// <summary>
        /// Complete message.
        /// </summary>
        /// <param name="message"></param>
        public void CompleteMessage(ServiceBusReceivedMessage message)
        {
            receiver.CompleteMessageAsync(message).Wait();
        }

        /// <summary>
        /// Abandon message.
        /// </summary>
        /// <param name="message"></param>
        public void AbandonMessage(ServiceBusReceivedMessage message)
        {
            receiver.AbandonMessageAsync(message).Wait();
        }
    }
}

当我一次处理一条消息时,一切工作正常,但如果我一次处理两条消息,则会收到以下错误:

提供的锁无效(&P>&Q;)。锁定已过期,或消息已从队列中删除,或由另一个接收方实例";接收。

并且我认为部分是我的问题所在。或由另一个接收方实例&接收。

我是不是想错了?我是否应该能够处理例如:一次放弃多个单个接收的邮件?

提前谢谢。

更新:

所以我认为我设法让它工作,因为Jesse Squire被建议做什么,以反映由Microsoft提供的新的Servicebus Ducumentation。

我创建了一个名为Servicebus Factory的新类,添加了下面Jesse提供的代码来测试它,并更改了示例顶部已初始化";";部分的";=new();,因为这样初始化它们会导致以下错误:

CS8400的"目标类型对象创建"功能在C#8.0中不可用。请使用9.0或更高版本的语言。

结果:

    public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
    {
        private readonly ServiceBusClient _client;
        private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
        private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();

        public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
        public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);

        public ServiceBusSender GetSender(string entity) =>
            _senders.GetOrAdd(entity, entity => _client.CreateSender(entity));

        public ServiceBusReceiver GetReceiver(string entity) =>
            _receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));

        public async ValueTask DisposeAsync()
        {
            await _client.DisposeAsync().ConfigureAwait(false);
            GC.SuppressFinalize(this);
        }
    }
}

然后,我为我的服务总线工厂创建了一个接口,并将其作为Singelton添加到我的";Startup.cs";类中我的";ConfigurationService";方法的底部,并使用我的服务总线连接字符串对其进行初始化。

界面:

public interface IServiceBusFactory
{
   public ServiceBusSender GetSender(string entity);

   public ServiceBusReceiver GetReceiver(string entity);

   public ValueTask DisposeAsync();
}

启动:

var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));

最后,这是一个依赖关系的问题将servicebus Factory接口注入到我的控制器构造函数中,然后使用它来获取";发送者";,并使用它向我的队列发送一条名为";Add";的消息。

构造函数:

private readonly IServiceBusFactory _serviceBusFactory;

public ServicebusController(IServiceBusFactory serviceBusFactory) 
{
    _serviceBusFactory = serviceBusFactory; 
}

控制器方法/操作实现:

var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));

var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);

AMQP

服务总线将消息锁与从中接收消息的推荐答案链接关联。对于SDK,这意味着您必须使用与接收消息相同的ServiceBusReceiver实例来结算消息。

在您的代码中,您正在为ReceiveMessage调用创建一个新的接收方-因此,当您尝试完成或放弃消息时,如果发生了对ReceiveMessage的任何其他调用,则您使用的是一个消息无效的链接。

通常,您希望避免创建生命周期短的Service Bus客户端对象的模式。它们intended to be long-lived并且在应用程序的生命周期中被重用。在您的代码中,您还隐式地放弃了发送方/接收方,而不关闭它们。这将孤立AMQP链路,直到服务在20分钟后将其强制关闭以使其处于空闲状态。

我建议将发送方/接收方集中在一起,并将每个发送方/接收方作为关联队列的单例。对于给定队列,对SendMessageReceiveMessage的每个调用都应使用相同的发送方/接收方实例。

当您的应用程序关闭时,请确保关闭或处置ServiceBusClient,这将确保其所有子发送方/接收方也得到适当清理。

我还强烈建议将您的类重构为异步类。您正在使用的基于异步的同步模式将会给线程池带来额外的压力,并且很可能导致线程饥饿和/或负载下的死锁。

更新

为了添加一些额外的上下文,我建议不要包装Service Bus操作,而是使用一个工厂来专注于管理客户端并让调用者直接与它们交互。

这可确保客户端被池化并正确管理它们的生存期,同时还允许调用方灵活地保留发送方/接收方引用并用于多个操作,而不是支付检索它的成本。

作为示例,下面是一个简单的工厂类,您将在应用程序中将其作为单例创建和管理。调用方能够为特定队列/主题/订阅请求发送方/接收方,并将根据需要创建它们,然后将其池化以供重复使用。

// This class is intended to be treated as a singleton for most 
// scenarios.  Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.

public class ServiceBusFactory : IAsyncDisposable
{
    // If throughput needs scale beyond a single connection, the factory can
    // manage multiple clients and ensure that child entities are evenly distributed
    // among them.
    
    private readonly ServiceBusClient _client;
    private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new();
    private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new();

    public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
    public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);

    public ServiceBusSender GetSender(string entity) =>
         _senders.GetOrAdd(entity, entity => _client.CreateSender(entity));

    public ServiceBusReceiver GetReceiver(string entity) =>
        _receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));

    public async ValueTask DisposeAsync()
    {
        await _client.DisposeAsync().ConfigureAwait(false);
        GC.SuppressFinalize(this);
    }
}

这篇关于完成或放弃邮件时发生Azure.Messaging.ServiceBus错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆