项目反应器采用Spring集成时的主要工艺流程程序化方法 [英] Main processing flow programmatic approach when using Spring Integration with Project Reactor

查看:35
本文介绍了项目反应器采用Spring集成时的主要工艺流程程序化方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想定义一个流,使用反应器Kafka消费Kafka并写入MongoDB,只有在成功时才将ID写入Kafka。我正在将项目反应器与Spring Integration JavaDSL结合使用,并且我希望有一个FlowBuilder类在较高级别定义我的管道。我目前有以下方向:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .publishSubscribeChannel(c -> c
                        .subscribe(sf -> sf
                                .handle(MongoDb.reactiveOutboundChannelAdapter())) 
      .handle(writeToKafka)
      .get();
}

我在docs that there is a support for a different approach, that also works with Project Reactor中看到过。此方法不包括使用IntegrationFlows。如下所示:

@MessagingGateway
public static interface TestGateway {

    @Gateway(requestChannel = "promiseChannel")
    Mono<Integer> multiply(Integer value);

    }

        ...

    @ServiceActivator(inputChannel = "promiseChannel")
    public Integer multiply(Integer value) {
            return value * 2;
    }

        ...

    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(integers -> ...);

我想知道在使用这两个库时,更推荐使用哪种处理方式。我想知道如何使用第二个示例中的反应式MongoDB适配器。如果没有IntegrationFlows包装,我甚至不确定第二种方法是否可行。

api

@MessagingGateway是为高级终端用户推荐答案设计的,尽可能地将消息隐藏在底层。因此,当您开发目标服务的逻辑时,目标服务不受任何消息传递抽象的影响。

可以从IntegrationFlow使用这样的接口适配器,您应该将其视为常规服务激活器,因此它将如下所示:

.handle("testGateway", "multiply", e -> e.async(true))

async(true)使该服务激活器订阅返回的Mono。您可以省略它,这样您就可以自己向下游订阅它了,因为这个Mono正好是流中下一条消息的payload

如果您希望有相反结果:从调用IntegrationFlow,如flatMap(),然后考虑使用流定义中的toReactivePublisher()运算符返回Publisher<?>并将其声明为bean。在这种情况下,最好不要使用MongoDb.reactiveOutboundChannelAdapter(),而只使用ReactiveMongoDbStoringMessageHandler,让其返回Mono传播到该Publisher

另一方面,如果您希望将@MessagingGatewayMono一起返回,但仍从中调用ReactiveMongoDbStoringMessageHandler,则将其声明为bean并将其标记为@ServiceActivator

我们还有一个ExpressionEvaluatingRequestHandlerAdvice捕获特定端点上的错误(或成功)并分别处理它们:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-advice

我想您要找的是这样的:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
      .handle(writeToKafka)
      .get();
}
请注意.handle(reactiveMongoDbStoringMessageHandler)-它与MongoDb.reactiveOutboundChannelAdapter()无关。因为这个将ReactiveMessageHandler包装成ReactiveMessageHandlerAdapter,用于自动订阅。您需要看起来更像是您希望将Mono<Void>返回给您自己控制,这样您就可以将其作为writeToKafka服务的输入,自己订阅该服务,并按照您解释的那样处理成功或错误。关键是,对于反应流,我们不能提供命令性错误处理。该方法与任何异步API的使用方法相同。因此,我们也向反应流的errorChannel发送错误。

我们可能可以通过类似returnMono(true/false)改进MongoDb.reactiveOutboundChannelAdapter(),让您这样的用例开箱即用。

这篇关于项目反应器采用Spring集成时的主要工艺流程程序化方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆