有没有办法使用 DSL 来分叉 Spring IntegrationFlow? [英] Is there a way to fork the Spring IntegrationFlow using DSL?

查看:34
本文介绍了有没有办法使用 DSL 来分叉 Spring IntegrationFlow?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想做这样的事情,其中​​网关有效负载是一个字符串和 serviceA &serviceB 两个返回列表.

I want do something like this where the gateway payload is a String and serviceA & serviceB both return lists.

    final IntegrationFlow flowA = flow -> flow
            .handle(serviceA) 
            .handle((payload, headers) -> payload); // List<Object>

    final IntegrationFlow flowB = flow -> flow
            .handle(serviceB) 
            .handle((payload, headers) -> payload); // List<Object>


    return IntegrationFlows
            .from(myGateway) // String payload
            .forkAndMerge(flowA, flowB, executor)
            .handle((payload, headers) -> payload)
            .get();

是否可以将流分叉为两个然后汇总结果?拆分器的大多数示例聚合器涉及拆分列表.

Is it possible to fork the flow into two and then gather up the results? Most examples of splitter & aggregators involve splitting up a list.

推荐答案

查看 .scatterGather() 变体.

此处为 ScatterGatherer 的主要文档.

编辑

示例:

@SpringBootApplication
public class So63605348Application {

    private static final Logger log = LoggerFactory.getLogger(So63605348Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So63605348Application.class, args);
    }

    @Bean
    IntegrationFlow flow(TaskExecutor exec) {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                .scatterGather(s -> s.applySequence(true)
                        .recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
                                .<String>handle((p, h) -> {
                                    log.info(p.toString());
                                    return p.toUpperCase();
                                }))
                        .recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
                                .<String>handle((p, h) -> {
                                    log.info(p.toString());
                                    return p + p;
                                })))
                .handle(System.out::println)
                .get();
    }

    @Bean
    public TaskExecutor exec() {
        ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
        exec.setCorePoolSize(2);
        return exec;
    }

}

结果

2020-08-26 17:33:56.769  INFO 50829 --- [         exec-1] com.example.demo.So63605348Application   : foo
2020-08-26 17:33:56.769  INFO 50829 --- [         exec-2] com.example.demo.So63605348Application   : foo
GenericMessage [payload=[foofoo, FOO], headers=...

EDIT2

如果您不想嵌套子流,可以将它们分解...

If you prefer not to nest the subflows, you can factor them out...

@Bean
IntegrationFlow flow(TaskExecutor exec) {
    return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
            .scatterGather(s -> s.applySequence(true)
                    .recipientFlow(flow2())
                    .recipientFlow(flow3()))
            .handle(System.out::println)
            .get();
}

@Bean
IntegrationFlow flow2() {
    return f -> f
        .<String>handle((p, h) -> {
            log.info(p.toString());
            return p + p;
        });
}

@Bean
IntegrationFlow flow3() {
    return f -> f
        .<String>handle((p, h) -> {
            log.info(p.toString());
            return p.toUpperCase();
        });
}

或者您可以使用发布/订阅频道变体...

Or you can use the pub/sub channel variant...

@Bean
IntegrationFlow flow() {
    return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
            .scatterGather(pubSub())
            .handle(System.out::println)
            .get();
}

@Bean
PublishSubscribeChannel  pubSub() {
    PublishSubscribeChannel pubSub = new PublishSubscribeChannel(exec());
    pubSub.setApplySequence(true);
    return pubSub;
}

@Bean
IntegrationFlow flow2() {
    return IntegrationFlows.from("pubSub")
        .<String>handle((p, h) -> {
            log.info(p.toString());
            return p + p;
        })
        .get();
}

@Bean
IntegrationFlow flow3() {
    return IntegrationFlows.from("pubSub")
        .<String>handle((p, h) -> {
            log.info(p.toString());
            return p.toUpperCase();
        })
        .get();
}

这篇关于有没有办法使用 DSL 来分叉 Spring IntegrationFlow?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆