每次我发布消息时,只有一个消费者随机触发 [英] Each time I Publish message, just one of the consumers fired randomly

查看:17
本文介绍了每次我发布消息时,只有一个消费者随机触发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在示例项目中使用了 MassTransit 和 RabbitMQ.我只是尝试发布一条消息,然后由 2 个独立的消费者使用它.但每次只有一名消费者随机开火!有什么问题?

这是发送方方法

 await _publishEndpoint.Publish(new ConsultantFinishedEvent(50));

Publisher 中应用程序启动中的此配置:

#region 服务总线配置public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.ConfigureEndpoints(上下文);cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});});});services.AddMassTransitHostedService();退货服务;}私有静态无效 RegisterMessageConsumers(IServiceCollectionBusConfigurator 服务){//service.AddConsumer();}#endregion

这是简单的消费者:

公共类ConsultantFinishedEventConsumer:IConsumer{公共任务消费(ConsumeContext上下文){int xx = 0;返回空;}}

还有我用于所有消费者初创公司的这个配置:

 #region 服务总线public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.ConfigureEndpoints(上下文);cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});//注册消费者cfg.ReceiveEndpoint(PatintQueue", ep =>{ep.PrefetchCount = 16;});});});services.AddMassTransitHostedService();退货服务;}私有静态无效 RegisterMessageConsumers(IServiceCollectionBusConfigurator 服务){service.AddConsumer();}

解决方案

首先,您的发布者配置顺序需要稍作调整:

public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});cfg.ConfigureEndpoints(上下文);});});}

<块引用>

注意ConfigureEndpoints应该在配置闭包的末尾

此外,您应该不要在异步方法中将 null 作为 Task 返回.

public class ConsultantFinishedEventConsumer : IConsumer{公共任务消费(ConsumeContext上下文){int xx = 0;返回 Task.CompletedTask;}}

最后,您在消费者方面的配置也需要调整.

public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});cfg.ConfigureEndpoints(上下文);});});}

默认情况下,MassTransit 将在消费者实例之间进行负载平衡.如果您想在多个实例中使用同一个使用者,您可以通过为使用者提供唯一的 instanceId 来实现.或者,您可以简单地指定仅在总线实例运行时才使用唯一端点来使用事件.

配置唯一的持久化 instanceId:

private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service){service.AddConsumer().端点(e =>{e.Name = "PatientQueue";e.InstanceId = 某些唯一值";e.PrefetchCount = 16;});}

将消费者配置为临时的,以便在实例退出时自动删除:

private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service){service.AddConsumer().端点(e =>{e.Name = "PatientQueue";e.InstanceId = 某些唯一值";e.PrefetchCount = 16;e.Temporary = true;});}

I use MassTransit and RabbitMQ in the sample project. I simply try to publish a message and then consume it by 2 separated Consumers. but each time just one of the consumers fired randomly! What is the problem?

this is the sender method

 await _publishEndpoint.Publish(new ConsultantFinishedEvent(50));

this config in application startup in Publisher:

#region Service Bus Config
    public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddMassTransit(x =>
        {
            RegisterMessageConsumers(x);
            RegisterRequestClients(x);
            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.ConfigureEndpoints(context);
                cfg.Host(configuration["EventBusConnection"], h =>
                {
                    h.Username(configuration["EventBusUserName"]);
                    h.Password(configuration["EventBusPassword"]);
                });
            });
        });
        services.AddMassTransitHostedService();

        return services;
    }

    private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
    {
        //service.AddConsumer<OurConsumerClass>();
    }


    #endregion

This is simple consumers:

public class ConsultantFinishedEventConsumer : IConsumer<ConsultantFinishedEvent>
{
    public Task Consume(ConsumeContext<ConsultantFinishedEvent> context)
    {
        int xx = 0;
        return null;
    }
}

and also this config that I use for all Consumers startups :

  #region Service Bus 
    public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddMassTransit(x =>
        {
            RegisterMessageConsumers(x);
            RegisterRequestClients(x);
            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.ConfigureEndpoints(context);
                cfg.Host(configuration["EventBusConnection"], h =>
                {
                    h.Username(configuration["EventBusUserName"]);
                    h.Password(configuration["EventBusPassword"]);
                });
                //Register Consumers
                cfg.ReceiveEndpoint("PatintQueue", ep =>
                {
                    ep.PrefetchCount = 16;
                });
            });
        });
        services.AddMassTransitHostedService();
        return services;
    }

    private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
    {
        service.AddConsumer<ConsultantFinishedEventConsumer>();
    }

解决方案

First off, your publisher configuration order needs a little adjustment:

public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
    services.AddMassTransit(x =>
    {
        RegisterMessageConsumers(x);
        RegisterRequestClients(x);

        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(configuration["EventBusConnection"], h =>
            {
                h.Username(configuration["EventBusUserName"]);
                h.Password(configuration["EventBusPassword"]);
            });
            
            cfg.ConfigureEndpoints(context);
        });
    });
}

Notice that ConfigureEndpoints should be at the end of the configuration closure

Additionally, you should not return null as a Task in an async method.

public class ConsultantFinishedEventConsumer : IConsumer<ConsultantFinishedEvent>
{
    public Task Consume(ConsumeContext<ConsultantFinishedEvent> context)
    {
        int xx = 0;

        return Task.CompletedTask;
    }
}

Finally, your configuration on your consumer side needs adjustment as well.

public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
    services.AddMassTransit(x =>
    {
        RegisterMessageConsumers(x);
        RegisterRequestClients(x);

        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(configuration["EventBusConnection"], h =>
            {
                h.Username(configuration["EventBusUserName"]);
                h.Password(configuration["EventBusPassword"]);
            });

            cfg.ConfigureEndpoints(context);
        });
    });
}

MassTransit will load balance across consumer instances by default. If you want to use the same consumer across multiple instances, you can do so by giving the consumer a unique instanceId. Alternatively, you could simply specify that a unique endpoint should be used to consume the event only when the bus instance is running.

To configure a unique persistent instanceId:

private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
{
    service.AddConsumer<ConsultantFinishedEventConsumer>()
        .Endpoint(e =>
        {
            e.Name = "PatientQueue";
            e.InstanceId = "some-unique-value";
            e.PrefetchCount = 16;
        });
}

To configure the consumer as temporary, so it's automatically removed when the instance exits:

private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
{
    service.AddConsumer<ConsultantFinishedEventConsumer>()
        .Endpoint(e =>
        {
            e.Name = "PatientQueue";
            e.InstanceId = "some-unique-value";
            e.PrefetchCount = 16;

            e.Temporary = true;
        });
}

这篇关于每次我发布消息时,只有一个消费者随机触发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆