Kaska Producer with MassTransit-IBusInstance未注册 [英] Kafka Producer with MassTransit - IBusInstance has not been registered

查看:51
本文介绍了Kaska Producer with MassTransit-IBusInstance未注册的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 MassTransit

建立一个Kafka用户.

我有这段代码

  var services = new ServiceCollection();services.AddMassTransit(x =>{x.AddRider(rider =>{rider.AddProducer< string,Request>(请求",m => m.Message.RequestId);rider.UsingKafka((context,k)=>{k.Host("localhost:9092");});});});var provider = services.BuildServiceProvider();var producer = provider.GetRequiredService< ITopicProducer< Request>>();等待producer.Produce(new Request(){RequestId ="abc123",RequestedAt = DateTime.UtcNow}); 

这是来自此处

但是当我尝试运行它时,出现此异常

 未处理的异常.System.InvalidOperationException:没有注册任何类型为'MassTransit.Registration.IBusInstance'的服务. 

查看他们网站上的示例,我发现这可能与我尚未注册RabbitMQ的事实有关

 <代码> x.UsingRabbitMq((context,cfg)=> cfg.ConfigureEndpoints(context)); 

但是我没有RabbitMQ,在这种情况下我只使用Kafka.

要向Kafka进行生产,是否有必要向其他消息代理注册总线?

解决方案

来自文档:

MassTransit v7引入的

车手提供了一种将消息从任何来源传递到总线的新方法.乘客与总线一起配置,并在启动总线时登上总线.

要添加骑手,必须有一个公交车实例.如果您不需要像RabbitMQ这样的持久传输总线,则可以使用内存中传输.

  var services = new ServiceCollection();services.AddMassTransit(x =>{x.UsingInMemory((context,cfg)=> cfg.ConfigureEndpoints(context));x.AddRider(rider =>{rider.AddProducer< string,Request>(请求",m => m.Message.RequestId);rider.UsingKafka((context,k)=>{k.Host("localhost:9092");});});}); 

需要启动和停止公交车,这也将启动/停止公交车上的所有骑手.您可以通过 IBusControl :

完成此操作

  var provider = services.BuildServiceProvider();var busControl = provider.GetRequiredService< IBusControl>();等待busControl.StartAsync(cancellationToken); 

或者如果您使用的是ASP.NET Core通用主机,则通过添加MassTransit托管服务.

  services.AddMassTransitHostedService();//在MassTransit.AspNetCore中 

I'm trying to build a Kafka consumer using MassTransit

I have this piece of code

var services = new ServiceCollection();
services.AddMassTransit(x =>
{   
    x.AddRider(rider =>
    {
        rider.AddProducer<string, Request>("request", m => m.Message.RequestId);
        
        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

var provider = services.BuildServiceProvider();
var producer = provider.GetRequiredService<ITopicProducer<Request>>();

await producer.Produce(new Request()
{
    RequestId = "abc123",
    RequestedAt = DateTime.UtcNow
});

This is the simplest example of a producer from here

but when I try to run it, I get this exception

Unhandled exception. System.InvalidOperationException: No service for type 'MassTransit.Registration.IBusInstance' has been registered.

Looking at the example from their website, I see that it could be related to the fact that I haven't registered a RabbitMQ

x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

But I don't have a RabbitMQ, I only use Kafka in this scenario.

Is it necessary to register a bus with some other message broker in order to produce to Kafka?

解决方案

From the documentation:

Riders, introduced with MassTransit v7, provide a new way to deliver messages from any source to a bus. Riders are configured along with a bus, and board the bus when it is started.

To add riders, there must be a bus instance. If you don't need a bus with a durable transport such as RabbitMQ, you can use the in-memory transport.

var services = new ServiceCollection();
services.AddMassTransit(x =>
{  
    x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));

    x.AddRider(rider =>
    {
        rider.AddProducer<string, Request>("request", m => m.Message.RequestId);
        
        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

The bus needs to be started and stopped, which will also start/stop any riders on the bus. You can do this via IBusControl:

var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();

await busControl.StartAsync(cancellationToken);

Or by adding the MassTransit Hosted Service if you're using the ASP.NET Core Generic Host.

services.AddMassTransitHostedService(); // in MassTransit.AspNetCore

这篇关于Kaska Producer with MassTransit-IBusInstance未注册的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆