MassTransit Consumer从未收到消息 [英] MassTransit Consumer never receives message
问题描述
我将按照在ASP.NET Core应用程序中将MassTransit与RabbitMQ和Autofac结合使用的文档,构建一个演示应用程序:
I'm building a demo application following along the documentation for using MassTransit with RabbitMQ and Autofac in an ASP.NET Core application:
我的程序代码:
namespace MessageDemo
{
public class Program
{
public static void Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureWebHostDefaults(webHostBuilder =>
{
webHostBuilder
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
.UseStartup<Startup>();
})
.Build();
host.Run();
}
}
}
我的创业公司:
public class Startup
{
public Startup(IWebHostEnvironment env)
{
var builder = new ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
.AddEnvironmentVariables();
this.Configuration = builder.Build();
}
public IConfiguration Configuration { get; }
public ILifetimeScope AutofacContainer { get; set; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddOptions();
services.AddControllers();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
this.AutofacContainer = app.ApplicationServices.GetAutofacRoot();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
// ConfigureContainer is where you can register things directly
public void ConfigureContainer(ContainerBuilder builder)
{
builder.RegisterType<DemoContent>().As<IDemoContent>();
builder.RegisterType<WeatherForecast>().As<IWeatherForecast>();
builder.AddMassTransit(x =>
{
x.AddConsumer<DemoConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("rabbitmq://my_container_ip/", host =>
{
host.Username("devuser");
host.Password("devuser");
});
cfg.ReceiveEndpoint("submit-data", ec =>
{
// Configure a single consumer
ec.ConfigureConsumer<DemoConsumer>(context);
});
}));
});
}
}
我的消费者:
public class DemoConsumer : IConsumer<IDemoContent>
{
public async Task Consume(ConsumeContext<IDemoContent> context)
{
Debug.WriteLine($"Write content: {context.Message.Data}");
await Console.Out.WriteLineAsync($"Write content: {context.Message.Data}");
}
}
仅出于测试目的,我通过点击控制器端点之一来触发发布,由容器注入PublishEndpoint:
Just for testing I'm triggering publish by hitting one of the controller endpoints, PublishEndpoint is injected by the container:
// GET: api/Demo
[HttpGet]
public async void Get()
{
await _endpoint.Publish<IDemoContent>(new
{
Data = "Some random content"
}, new CancellationToken());
}
这一切似乎都在起作用-没有错误消息-使用InMemoryTestHarness添加了一个演示单元测试,并且可以正常工作-我的RabbitMQ实例在Manager概述中注册了已发布的消息
This all appears to be working - no error messages - added a demo Unit test using InMemoryTestHarness and that is working - my RabbitMQ instance registers the published message in the Manager Overview
我在RabbitMQ管理UI中收到发布者确认,但消息显示为Unroutable(丢弃).
I am getting Publisher Confirmation in the RabbitMQ management UI but the messages show up as Unroutable(drop).
推荐答案
由于您尚未添加托管服务,因此总线无法启动
The bus doesn't start since you haven't added the hosted service
services.AddMassTransitHostedService();
就在文档的代码段中a>.
It is right there in the code snippet in the docs.
这篇关于MassTransit Consumer从未收到消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!