Masstransit Azure服务总线和DI请求/响应模型 [英] Masstransit Azure Service Bus and DI Request/Response model

查看:71
本文介绍了Masstransit Azure服务总线和DI请求/响应模型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了Azure Service Bus以及与Masstransit和ASP.NET Core DI集成的问题.当我尝试为 IBus IRequestClient 设置DI并发送消息以请求端点时,我收到RequestTimeoutException,但请求端点获取了消息并对其进行了处理.当我尝试使用RabbitMQ进行相同操作或使用Azure Service Bus在本地创建IBus时,一切正常.仅当我尝试通过ASP.NET Core DI设置Azure Service Bus时,此问题才适用.

I faced with issue of Azure Service Bus and integration with Masstransit and ASP.NET Core DI. When i try to setup DI for IBus and IRequestClient and send message to request endpoint i get RequestTimeoutException but request endpoint get message and processed it. When i'll try to do same using RabbitMQ or create IBus locally using Azure Service Bus, all work fine. Issue only apply when i try to setup Azure Service Bus by ASP.NET Core DI.

设置天蓝色服务总线:

public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    public IConfiguration Configuration { get; }

    // This method gets called by the runtime. Use this method to add services to the container.
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        var bus = InitBus();

        services.AddSingleton<IPublishEndpoint>(bus);
        services.AddSingleton<ISendEndpointProvider>(bus);
        services.AddSingleton<IBus>(bus);

        var timeout = TimeSpan.FromSeconds(10);
        var serviceAddress = new Uri("url");

        services.AddScoped<IRequestClient<SubmitOrder, OrderAccepted>>(x =>
            new MessageRequestClient<SubmitOrder, OrderAccepted>(x.GetRequiredService<IBus>(), serviceAddress, timeout, timeout));

        bus.Start();
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        if (env.IsDevelopment()) app.UseDeveloperExceptionPage();
        var bus = InitBus();

        bus.Start();

        app.UseMvc();
        //            app.Run();
    }

    public IBusControl InitBus()
    {
        var bus = Bus.Factory.CreateUsingAzureServiceBus(x => x.Host(new Uri(""), h =>
        {
            h.OperationTimeout = TimeSpan.FromSeconds(5);
            h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("KeyName", "KeyValue");
        }));

        return bus;
    }
}

致电IRequestClient

Call IRequestClient

[Route("api/[controller]")]
public class ValuesController : Controller
{
    private readonly IRequestClient<SubmitOrder, OrderAccepted> _requestClient;

    public ValuesController(IRequestClient<SubmitOrder, OrderAccepted> requestClient)
    {
        _requestClient = requestClient;
    }

    [HttpGet("{id}")]
    public async Task<IActionResult> Get(int id)
    {
        try
        {
            OrderAccepted result = await _requestClient.Request(new { OrderId = id });

            return Accepted(result.OrderId);
        }
        catch (RequestTimeoutException exception)
        {
            return StatusCode((int)HttpStatusCode.RequestTimeout);
        }
    }
}

接收器设置:

 class Program
{
    static void Main(string[] args)
    {
        Start().Wait();
    }

    public static async Task Start()
    {
        var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            var host = cfg.Host(new Uri("url"), h =>
            {
                h.OperationTimeout = TimeSpan.FromSeconds(5);
                h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("KeyName", "KeyValue=");
            });

            cfg.ReceiveEndpoint(host, "order-service", e =>
            {
                e.Handler<SubmitOrder>(context => context.RespondAsync<OrderAccepted>(new
                {
                    context.Message.OrderId
                }));
            });
        });

        await bus.StartAsync();

        try
        {
            Console.WriteLine("Working....");

            Console.ReadLine();
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
        }
        finally
        {
            await bus.StopAsync();
        }
    }
}

推荐答案

首先,我建议查看创建的示例(该示例使用RabbitMQ,但应轻松转换为Azure Service Bus):

First, I'd suggest looking at the sample created (which uses RabbitMQ, but should easily be converted to Azure Service Bus):

https://github.com/phatboyg/Sample-DotNetCore-DI

使用此示例可以正确完成DI使用模式.关于您的示例,有些事情不太正确.您还使用了无法正确捕获 ConsumeContext 进行响应跟踪的旧式请求客户端.

The DI usage patterns are properly done using this sample. There are a few things about your example that aren't quite right. You're also using the legacy request client that doesn't properly capture the ConsumeContext for the response tracking.

这篇关于Masstransit Azure服务总线和DI请求/响应模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆