项目反应器采用Spring集成时的主要工艺流程程序化方法 [英] Main processing flow programmatic approach when using Spring Integration with Project Reactor
问题描述
我想定义一个流,使用反应器Kafka消费Kafka并写入MongoDB,只有在成功时才将ID写入Kafka。我正在将项目反应器与Spring Integration JavaDSL结合使用,并且我希望有一个FlowBuilder
类在较高级别定义我的管道。我目前有以下方向:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
我在docs that there is a support for a different approach, that also works with Project Reactor中看到过。此方法不包括使用IntegrationFlows
。如下所示:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
我想知道在使用这两个库时,更推荐使用哪种处理方式。我想知道如何使用第二个示例中的反应式MongoDB适配器。如果没有IntegrationFlows
包装,我甚至不确定第二种方法是否可行。
api
@MessagingGateway
是为高级终端用户推荐答案设计的,尽可能地将消息隐藏在底层。因此,当您开发目标服务的逻辑时,目标服务不受任何消息传递抽象的影响。
可以从IntegrationFlow
使用这样的接口适配器,您应该将其视为常规服务激活器,因此它将如下所示:
.handle("testGateway", "multiply", e -> e.async(true))
async(true)
使该服务激活器订阅返回的Mono
。您可以省略它,这样您就可以自己向下游订阅它了,因为这个Mono
正好是流中下一条消息的payload
。
IntegrationFlow
,如flatMap()
,然后考虑使用流定义中的toReactivePublisher()
运算符返回Publisher<?>
并将其声明为bean。在这种情况下,最好不要使用MongoDb.reactiveOutboundChannelAdapter()
,而只使用ReactiveMongoDbStoringMessageHandler
,让其返回Mono
传播到该Publisher
。
另一方面,如果您希望将@MessagingGateway
与Mono
一起返回,但仍从中调用ReactiveMongoDbStoringMessageHandler
,则将其声明为bean并将其标记为@ServiceActivator
。
我们还有一个ExpressionEvaluatingRequestHandlerAdvice
捕获特定端点上的错误(或成功)并分别处理它们:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-advice
我想您要找的是这样的:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
请注意.handle(reactiveMongoDbStoringMessageHandler)
-它与MongoDb.reactiveOutboundChannelAdapter()
无关。因为这个将ReactiveMessageHandler
包装成ReactiveMessageHandlerAdapter
,用于自动订阅。您需要看起来更像是您希望将Mono<Void>
返回给您自己控制,这样您就可以将其作为writeToKafka
服务的输入,自己订阅该服务,并按照您解释的那样处理成功或错误。关键是,对于反应流,我们不能提供命令性错误处理。该方法与任何异步API的使用方法相同。因此,我们也向反应流的errorChannel
发送错误。
我们可能可以通过类似returnMono(true/false)
改进MongoDb.reactiveOutboundChannelAdapter()
,让您这样的用例开箱即用。
这篇关于项目反应器采用Spring集成时的主要工艺流程程序化方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!