MassTransit Consumer 从未收到消息 [英] MassTransit Consumer never receives message

查看:32
本文介绍了MassTransit Consumer 从未收到消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在按照文档构建一个演示应用程序,以在 ASP.NET Core 应用程序中使用 MassTransit 与 RabbitMQ 和 Autofac:

I'm building a demo application following along the documentation for using MassTransit with RabbitMQ and Autofac in an ASP.NET Core application:

我的程序代码:

namespace MessageDemo
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var host = Host.CreateDefaultBuilder(args)
                .UseServiceProviderFactory(new AutofacServiceProviderFactory())
                .ConfigureWebHostDefaults(webHostBuilder =>
                {
                    webHostBuilder
                        .UseContentRoot(Directory.GetCurrentDirectory())
                        .UseIISIntegration()
                        .UseStartup<Startup>();
                })
                .Build();
            host.Run();
        }
    }
}

我的创业:

    public class Startup
    {
        public Startup(IWebHostEnvironment env)
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(env.ContentRootPath)
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
                .AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
                .AddEnvironmentVariables();
            this.Configuration = builder.Build();
        }

        public IConfiguration Configuration { get; }
        public ILifetimeScope AutofacContainer { get; set; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddOptions();
            services.AddControllers();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            this.AutofacContainer = app.ApplicationServices.GetAutofacRoot();

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }

        // ConfigureContainer is where you can register things directly
        public void ConfigureContainer(ContainerBuilder builder)
        {

            builder.RegisterType<DemoContent>().As<IDemoContent>();
            builder.RegisterType<WeatherForecast>().As<IWeatherForecast>();

            builder.AddMassTransit(x =>
            {
                x.AddConsumer<DemoConsumer>();

                x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
                {

                    cfg.Host("rabbitmq://my_container_ip/", host =>
                    {
                        host.Username("devuser");
                        host.Password("devuser");
                    });


                    cfg.ReceiveEndpoint("submit-data", ec =>
                    {
                        // Configure a single consumer
                        ec.ConfigureConsumer<DemoConsumer>(context);
                    });

                }));
            });
        }
    }

我的消费者:

    public class DemoConsumer : IConsumer<IDemoContent>
    {
        public async Task Consume(ConsumeContext<IDemoContent> context)
        {
            Debug.WriteLine($"Write content: {context.Message.Data}");
            await Console.Out.WriteLineAsync($"Write content: {context.Message.Data}");
        }
    }

只是为了测试,我通过点击控制器端点之一触发发布,PublishEndpoint 由容器注入:

Just for testing I'm triggering publish by hitting one of the controller endpoints, PublishEndpoint is injected by the container:

        // GET: api/Demo
        [HttpGet]
        public async void Get()
        {
            await _endpoint.Publish<IDemoContent>(new
            {
                Data = "Some random content"
            }, new CancellationToken());
        }

这一切似乎都在工作- 没有错误信息- 添加了一个使用 InMemoryTestHarness 的演示单元测试,并且正在工作- 我的 RabbitMQ 实例在 Manager Overview 中注册发布的消息

This all appears to be working - no error messages - added a demo Unit test using InMemoryTestHarness and that is working - my RabbitMQ instance registers the published message in the Manager Overview

我在 RabbitMQ 管理 UI 中获得发布者确认,但消息显示为 Unrouteable(drop).

I am getting Publisher Confirmation in the RabbitMQ management UI but the messages show up as Unroutable(drop).

推荐答案

由于您没有添加托管服务,因此总线无法启动

The bus doesn't start since you haven't added the hosted service

services.AddMassTransitHostedService();

就在代码片段中在文档中.

这篇关于MassTransit Consumer 从未收到消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆