MassTransit-等待所有活动完成,然后继续处理 [英] MassTransit - Wait for all activities to complete and then continue processing

查看:148
本文介绍了MassTransit-等待所有活动完成,然后继续处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我需要进行大量活动,是否会导致资源阻塞或请求超时?

If I have to much activities, does it cause blocking resources or request time out?

这是我的情况:

我有一个api控制器,可将Order请求发送给消费者;我使用Request/Response模式从使用者接收 ErrorMessage 属性,并基于该属性响应返回,如果为空,我想返回 OK() ,否则,返回 BadRequest Ok ,但显示类似以下消息:产品缺货至通知客户.

I have an api controller which sends an Order request to consumer; I use Request/Response patern to recieve ErrorMessage property from consumer and base on that property response back, if it's null I would want to return OK() otherwise, return BadRequest or Ok but with a message like: Product out of stock to notify to the client.

在我的消费者中,我建立了一个包含2个活动的路由清单:

In my consumer, I have build a routing slip which have 2 activities:

  • CreateOrderActivity :创建带有订单详细信息的订单.
  • ReserveProductActivity :如果产品数量<0 我将向用户发布一条带有 ErrorMessage 的消息,并补偿之前的活动.

  • CreateOrderActivity: Which creates an order with order details.
  • ReserveProductActivity: Which reduces the quantity of product in stock, if product quantity < 0 I'll publish a message with an ErrorMessage back to the consumer and compensate the previous activity.

public async Task Consume(ConsumeContext<ProcessOrder> context)
{
    try
    {
        if (!string.IsNullOrEmpty(context.Message.ErrorMessage))
        {
            await context.RespondAsync<OrderSubmitted>(new
            {
                context.Message.OrderId,
                context.Message.ErrorMessage
            });

            return;
        }

        RoutingSlipBuilder builder = new RoutingSlipBuilder(context.Message.OrderId);
        // get configs
        var settings = new Settings(_configuration);

        // Add activities
        builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
        builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });

        builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
        builder.SetVariables(new { context.Message.OrderDetails });


        await context.Execute(builder.Build());

        await context.RespondAsync<OrderSubmitted>(new
        {
            context.Message.OrderId
        });

    }
    catch (Exception ex)
    {
        _log.LogError("Can not create Order {OrderId}", context.Message.OrderId);
        throw new Exception(ex.Message);
    }
}

ReserveProductActivity的代码:

Code for ReserveProductActivity:

    public async Task<ExecutionResult> Execute(ExecuteContext<ReserveProductArguments> context)
    {
        var orderDetails = context.Arguments.OrderDetails;

        foreach (var orderDetail in orderDetails)
        {
            var product = await _productRepository.GetByProductId(orderDetail.ProductId);
            if (product == null) continue;

            var quantity = product.SetQuantity(product.QuantityInStock - orderDetail.Quantity);


            if (quantity < 0)
            {
                var errorMessage = "Out of stock.";
                await context.Publish<ProcessOrder>(new
                {
                    ErrorMessage = errorMessage
                });
                throw new RoutingSlipException(errorMessage);
            }

            await _productRepository.Update(product);
        }

        return context.Completed(new Log(orderDetails.Select(x => x.ProductId).ToList()));
    }

使用方方法中的以下代码行等待context.Execute(builder.Build())

This line of code in a consumer method await context.Execute(builder.Build())

起初,我认为它将构建路由清单并先执行所有活动,然后再转到下一行,但事实并非如此.取而代之的是,它立即转到下一行代码(响应返回到控制器),然后在执行活动之后执行,这不是我想要的.我需要先检查第二个活动中的产品数量,然后根据返还给控制器的信息进行确认.

At first I thought it would build the routing slip and execute all activities first before going to the next line but it's not. Instead it's immediately going to the next line of code (which responses back to controller) and then after execute activities, which is not what I want. I need to check the quantity of product in 2nd activity first and base on that return back to the controller.

(当前,它总是先响应控制器-在 buider.Buid() 之后的行,然后在 quantity<0 ,它仍然会首先进入消耗方法的条件,但由于它已经响应,因此我无法再次在if语句内触发响应.

(In current, it always responses back to controller first - the line after buider.Buid(), and then if quantity < 0 it still goes to the very first if condition of the consume method but since it already responses, I cannot trigger response inside that if statement again).

简而言之,如果产品在第二个活动中仍然可用,我可以像往常一样发送回响应(在 context.Execute(builder.Build())之后执行代码),但是如果 quantity< 0 -我通过 ErrorMessage 发布回使用方方法,我希望它跳转到Consume方法的第一个if条件( if(!string.IsNullOrEmpty(context.Message.ErrorMessage))... )并基于 ErrorMessage 通知客户.

So in short, if product is still available in 2nd activity I can send the reponse back like normal (which executes the code after context.Execute(builder.Build()), but if quantity < 0 - which I publish back to the consumer method with ErrorMessage, I would like it to jump to the very first if condition of Consume method (if(!string.IsNullOrEmpty(context.Message.ErrorMessage)) ...) and base on the ErrorMessage notify the client.

这种方法有什么问题吗?我该如何实现这样的目标?

Is there something wrong with this approach? How can I achieve something like this?

谢谢

推荐答案

没有文档,但是可以使用代理来执行路由清单,并以路由清单的结果响应请求.您可以在单元测试中查看详细信息:

It isn't documented, but it is possible to use a proxy to execute a routing slip, and response to the request with the result of the routing slip. You can see the details in the unit tests:

https://github.com/MassTransit/MassTransit/blob/master/tests/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L20

您可以创建代理,以构建路由清单并执行该清单,然后创建响应代理-两者均在接收端点上配置为 .Instance 使用者.

You could create the proxy, which builds the routing slip and executes it, and the response proxy - both of which are then configured on a receive endpoint as .Instance consumers.

  class RequestProxy :
        RoutingSlipRequestProxy<Request>
    {
        protected override void BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<Request> request)
        {
        // get configs
        var settings = new Settings(_configuration);

        // Add activities
        builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
        builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });

        builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
        builder.SetVariables(new { context.Message.OrderDetails });
        }
    }


    class ResponseProxy :
        RoutingSlipResponseProxy<Request, Response>
    {
        protected override Response CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, Request request)
        {
            return new Response();
        }
    }

然后您可以从使用者那里调用它,或者将排序逻辑放在代理中(视情况而定),然后使用控制器中的请求客户端发送请求并等待响应.

You could then call it from the consumer, or put the ordering logic in the proxy - whichever makes sense, and then use the request client from your controller to send the request and await the response.

这篇关于MassTransit-等待所有活动完成,然后继续处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆