完成或放弃邮件时发生Azure.Messaging.ServiceBus错误 [英] Azure.Messaging.ServiceBus Error when completing or abandoning message
问题描述
SOM我目前正在考虑将我们非常简单的Service Bus服务更新到最新版本(Asure.Messaging.Servicebus),我在这里遇到了一个较小的问题。
问题是,我希望通过将消息委托回服务类中的方法来处理作业,从而手动完成或放弃接收到的或达到峰值的消息。
这是我目前为止的简单类,由一个接口公开。
using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;
namespace myProject.Services
{
/// <summary>
/// Service bus service controller
/// </summary>
public class ServiceBusService: IServiceBusService
{
private IConfigurationUtility _configurationUtility;
static string connectionString;
static ServiceBusClient client;
static ServiceBusSender sender;
static ServiceBusReceiver receiver;
public ServiceBusService(IConfigurationUtility configurationUtility)
{
_configurationUtility = configurationUtility;
connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
client = new ServiceBusClient(connectionString);
}
/// <summary>
/// Sending message.
/// </summary>
/// <param name="messageContent"></param>
/// <param name="queueName"></param>
public void SendMessage(string messageContent, string queueName)
{
sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage(messageContent);
sender.SendMessageAsync(message).Wait();
}
/// <summary>
/// Receive message.
/// </summary>
/// <returns></returns>
/// <param name="queueName"></param>
public ServiceBusReceivedMessage ReceiveMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
return receivedMessage;
}
/// <summary>
/// Peek message.
/// </summary>
/// <returns></returns>
public ServiceBusReceivedMessage PeekMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
return peekedMessage;
}
/// <summary>
/// Complete message.
/// </summary>
/// <param name="message"></param>
public void CompleteMessage(ServiceBusReceivedMessage message)
{
receiver.CompleteMessageAsync(message).Wait();
}
/// <summary>
/// Abandon message.
/// </summary>
/// <param name="message"></param>
public void AbandonMessage(ServiceBusReceivedMessage message)
{
receiver.AbandonMessageAsync(message).Wait();
}
}
}
当我一次处理一条消息时,一切工作正常,但如果我一次处理两条消息,则会收到以下错误:
提供的锁无效(&P>&Q;)。锁定已过期,或消息已从队列中删除,或由另一个接收方实例";接收。
并且我认为部分是我的问题所在。或由另一个接收方实例&接收。
我是不是想错了?我是否应该能够处理例如:一次放弃多个单个接收的邮件?
提前谢谢。
更新:
所以我认为我设法让它工作,因为Jesse Squire被建议做什么,以反映由Microsoft提供的新的Servicebus Ducumentation。
我创建了一个名为Servicebus Factory的新类,添加了下面Jesse提供的代码来测试它,并更改了示例顶部已初始化";的";部分的";=new();,因为这样初始化它们会导致以下错误:
CS8400的"目标类型对象创建"功能在C#8.0中不可用。请使用9.0或更高版本的语言。
结果:
public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
{
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
}
然后,我为我的服务总线工厂创建了一个接口,并将其作为Singelton添加到我的";Startup.cs";类中我的";ConfigurationService";方法的底部,并使用我的服务总线连接字符串对其进行初始化。
界面:
public interface IServiceBusFactory
{
public ServiceBusSender GetSender(string entity);
public ServiceBusReceiver GetReceiver(string entity);
public ValueTask DisposeAsync();
}
启动:
var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));
最后,这是一个依赖关系的问题将servicebus Factory接口注入到我的控制器构造函数中,然后使用它来获取";发送者";,并使用它向我的队列发送一条名为";Add";的消息。
构造函数:
private readonly IServiceBusFactory _serviceBusFactory;
public ServicebusController(IServiceBusFactory serviceBusFactory)
{
_serviceBusFactory = serviceBusFactory;
}
控制器方法/操作实现:
var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));
var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);
AMQP
服务总线将消息锁与从中接收消息的推荐答案链接关联。对于SDK,这意味着您必须使用与接收消息相同的ServiceBusReceiver
实例来结算消息。
在您的代码中,您正在为ReceiveMessage
调用创建一个新的接收方-因此,当您尝试完成或放弃消息时,如果发生了对ReceiveMessage
的任何其他调用,则您使用的是一个消息无效的链接。
我建议将发送方/接收方集中在一起,并将每个发送方/接收方作为关联队列的单例。对于给定队列,对SendMessage
或ReceiveMessage
的每个调用都应使用相同的发送方/接收方实例。
当您的应用程序关闭时,请确保关闭或处置ServiceBusClient
,这将确保其所有子发送方/接收方也得到适当清理。
我还强烈建议将您的类重构为异步类。您正在使用的基于异步的同步模式将会给线程池带来额外的压力,并且很可能导致线程饥饿和/或负载下的死锁。
更新
为了添加一些额外的上下文,我建议不要包装Service Bus操作,而是使用一个工厂来专注于管理客户端并让调用者直接与它们交互。
这可确保客户端被池化并正确管理它们的生存期,同时还允许调用方灵活地保留发送方/接收方引用并用于多个操作,而不是支付检索它的成本。
作为示例,下面是一个简单的工厂类,您将在应用程序中将其作为单例创建和管理。调用方能够为特定队列/主题/订阅请求发送方/接收方,并将根据需要创建它们,然后将其池化以供重复使用。
// This class is intended to be treated as a singleton for most
// scenarios. Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.
public class ServiceBusFactory : IAsyncDisposable
{
// If throughput needs scale beyond a single connection, the factory can
// manage multiple clients and ensure that child entities are evenly distributed
// among them.
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
这篇关于完成或放弃邮件时发生Azure.Messaging.ServiceBus错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!