使用 MassTransit SQS 时如何配置主题名称? [英] How can I configure the topic name when using MassTransit SQS?

查看:21
本文介绍了使用 MassTransit SQS 时如何配置主题名称?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 MassTransit SQS.我发现,当发布消息时,会出现这个问题:

用户无权执行:SNS:CreateTopic on资源

所以我需要创建主题的权限.

我通过要求管理员扩展权限解决了这个问题.可以创建此作品和主题.但是,主题名称似乎是根据类名称自动生成的.我可以以编程方式修改此行为吗(通过提供自我描述的主题名称).

这对于创建 FIFO 主题(需要以 .fifo 结尾的名称)很有用.

更新

我现在熟悉格式(感谢告诉我 Chris)和 SetEntityName.

但我的问题似乎是特定于 FIFO 的.

这有效:

 cfg.Message(x =>{x.SetEntityName("customerupdate");});

但这不起作用:

 cfg.Message(x =>{x.SetEntityName("customerupdate.fifo");});

它导致一个异常,错误信息是:

<块引用>

无效参数:主题名称

这种类型:

<块引用>

Amazon.SimpleNotificationService.Model.InvalidParameterException

使用此堆栈跟踪:

Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException 异常, Stream responseStream)Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.d__2.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.ExceptionHandler`1.d__6.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)Amazon.Runtime.Internal.ErrorHandler.d__8.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.ErrorHandler.d__5`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.CallbackHandler.d__9`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.CredentialsRetriever.d__7`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)Amazon.Runtime.Internal.RetryHandler.d__10`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()Amazon.Runtime.Internal.RetryHandler.d__10`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.CallbackHandler.d__9`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.CallbackHandler.d__9`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.ErrorCallbackHandler.d__5`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()Amazon.Runtime.Internal.MetricsHandler.d__1`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()MassTransit.AmazonSqsTransport.Contexts.TopicCache.d__9.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()MassTransit.AmazonSqsTransport.Contexts.TopicCache.<>c__DisplayClass6_0.<<Get>b__0>d.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()GreenPipes.Caching.Internals.PendingValue`2.d__6.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<Declare>d__7.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.d__5.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-AmazonSqsTransport-ClientContext>-Send>b__0>d.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()GreenPipes.PipeExtensions.d__2`1.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.-Send>d__3.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.d__5.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()GreenPipes.Agents.PipeContextSupervisor`1.-Send>d__7.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()GreenPipes.Agents.PipeContextSupervisor`1.-Send>d__7.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()GreenPipes.Agents.PipeContextSupervisor`1.-Send>d__7.MoveNext()System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)System.Runtime.CompilerServices.TaskAwaiter.GetResult()<PublishAsync>d__2.MoveNext() in Publisher.cs: line: 18

这是我的代码

 public static void UseMassTransit(this IServiceCollection services, MassTransitConfiguration massTransitConfiguration){services.AddMassTransit(x =>{x.AddConsumer();x.UsingAmazonSqs((context, cfg) =>{cfg.Host(massTransitConfiguration.Host, h =>{h.AccessKey(massTransitConfiguration.AccessKey);h.SecretKey(massTransitConfiguration.SecretKey);h.EnableScopedTopics();});cfg.ReceiveEndpoint(CustomerChangeConsumer",配置器 =>{configurator.ConfigureConsumer(上下文);});cfg.Message(x =>{//这会导致发布时的异常:x.SetEntityName("customerupdate.fifo");//但这是有效的.发布时没有问题//x.SetEntityName("customerupdate");});});});services.AddMassTransitHostedService();}}

解决方案

您可以配置消息拓扑指定实体名称格式.

例如,要更改 OrderSubmitted 事件的主题名称:

Bus.Factory.CreateUsingAmazonSqs(cfg =>{cfg.Message(x =>{x.SetEntityName("order-submitted.fifo");});cfg.Publish(x =>{x.TopicAttributes[FifoTopic"] =true";});});

要在发布时设置 MessageGroupId,请使用:

await bus.Publish(message, x => x.SetGroupId(..."));

<块引用>

我不能 100% 确定 Publish 和 Send 会存活下来.

I use MassTransit SQS. I found out, when publishing a message, this problem occurs:

User is not authorized to perform: SNS:CreateTopic on resource

So I need rights to create topics.

I solved it by asking the admin to extend rights. This works and topics can be created. However, it seems that the topic names are automatically generated based on the class names. Can I modify this behaviour programmatically (so by giving a self described topic name).

This can be useful for creating a FIFO topic (which needs to have a name ending with .fifo).

Update

I am now familiar with formatting (thanks for telling me Chris) with SetEntityName.

But my problem seems to be really fifo specific.

This works:

                cfg.Message<CustomerUpdate>(x =>
                {
                    x.SetEntityName("customerupdate");
                });

but this does not work:

                cfg.Message<CustomerUpdate>(x =>
                {
                    x.SetEntityName("customerupdate.fifo");
                });

It results in an exception with is error message:

Invalid parameter: Topic Name

of this type:

Amazon.SimpleNotificationService.Model.InvalidParameterException

with this stacktrace:

Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream)
Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.<HandleExceptionAsync>d__2.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ExceptionHandler`1.<HandleAsync>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.ErrorHandler.<ProcessExceptionAsync>d__8.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CredentialsRetriever.<InvokeAsync>d__7`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorCallbackHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.MetricsHandler.<InvokeAsync>d__1`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.TopicCache.<CreateMissingTopic>d__9.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.TopicCache.<>c__DisplayClass6_0.<<Get>b__0>d.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Caching.Internals.PendingValue`2.<CreateValue>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<Declare>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<ConfigureTopology>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-AmazonSqsTransport-ClientContext>-Send>b__0>d.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.PipeExtensions.<OneTimeSetup>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Pipeline.ConfigureTopologyFilter`1.<GreenPipes-IFilter<MassTransit-AmazonSqsTransport-ClientContext>-Send>d__3.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.<Send>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.TaskAwaiter.GetResult()<PublishAsync>d__2.MoveNext() in Publisher.cs: line: 18

Here is my code

    public static void UseMassTransit(this IServiceCollection services, MassTransitConfiguration massTransitConfiguration)
    {
        services.AddMassTransit(x =>
        {
            x.AddConsumer<CustomerChangeConsumer>();
            x.UsingAmazonSqs((context, cfg) =>
            {
                cfg.Host(massTransitConfiguration.Host, h =>
                {
                    h.AccessKey(massTransitConfiguration.AccessKey);
                    h.SecretKey(massTransitConfiguration.SecretKey);

                    h.EnableScopedTopics();
                });

                cfg.ReceiveEndpoint("CustomerChangeConsumer",
                    configurator =>
                    {
                        configurator.ConfigureConsumer<CustomerChangeConsumer>(context);
                    });

                cfg.Message<CustomerUpdate>(x =>
                {
                    // This causes the exception when publishing:
                    x.SetEntityName("customerupdate.fifo");
                    // But this works. No issues when publishing
                    //x.SetEntityName("customerupdate");
                });
            });
        });
        
        services.AddMassTransitHostedService();
    }
}

解决方案

You can configure the message topology to specify the entity name formatting.

For instance, to change the topic name for the OrderSubmitted event:

Bus.Factory.CreateUsingAmazonSqs(cfg =>
{
    cfg.Message<OrderSubmitted>(x =>
    {
        x.SetEntityName("order-submitted.fifo");
    });

    cfg.Publish<OrderSubmitted>(x =>
    {
        x.TopicAttributes["FifoTopic"] = "true";
    });
});

To set the MessageGroupId on publish, use:

await bus.Publish<OrderSubmitted>(message, x => x.SetGroupId("..."));

I'm not 100% sure that survives Publish, vs. Send.

这篇关于使用 MassTransit SQS 时如何配置主题名称?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆