RabbitMq以异步方式处理收到的消息 [英] RabbitMq Handle Received message in async way
本文介绍了RabbitMq以异步方式处理收到的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用RabbitMq
处理在总线上收到的消息.我想知道是否有更好的方法来处理收到的消息(也许使用async/await
模式)
I'm using RabbitMq
to process messages I receive on a bus. I was wondering if there's a better way to process the message I receive (maybe using async/await
pattern)
这是我的代码段
connection = connectionFactory.CreateConnection();
channel = connection.CreateModel();
channel.QueueDeclare(queue: Constants.RabbitListeningQueue,durable: false,exclusive: false,autoDelete: false,arguments: null);
channel.QueueDeclare(queue: Constants.RabbitMqRequestInsertedQueue,durable: false,exclusive: false,autoDelete: false,arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
log.Debug($"[x] Received message :{ea}");
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var dynamicObject = JObject.Parse(message);
queueMessageHandler.HandleMessage(dynamicObject);
};
queueMessageHandler
实现如下
public class QueueMessageHandler : IQueueMessageHandler
{
private readonly IImportNucleoManager importNucleoManager;
public QueueMessageHandler(IImportNucleoManager importNucleoManager)
{
this.importNucleoManager = importNucleoManager;
}
public void HandleMessage(dynamic message)
{
switch ((string)message.Type)
{
case "T1":
{
importNucleoManager.Process(message);
break;
}
case "T3":
importNucleoManager.ProceedToInsertStep(message);
break;
}
}
}
我想知道(由于T1/T3事件需要花费很长时间),它们是否应该为Task
,所以甚至HandleMessage
也应该为HandleMessageAsync
?在这种情况下,我还必须通过async void
,这不是我所知道的最佳实践
I was wondering (since the T1/T3 events take a long time to process) should they be Task
and so even the HandleMessage
should be HandleMessageAsync
? In this case, I also have to pass an async void
which is not a best practice as I know
推荐答案
static async Task Main(string[] args)
{
var connectionFactory = new ConnectionFactory(DispatchConsumersAsync = true);
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
}
static async Task Consumer_Received(object sender, BasicDeliverEventArgs @event)
{
await DoSomethingAsync();
}
这篇关于RabbitMq以异步方式处理收到的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文