错误“是弹簧集成聚合器 DSL 的单向“MessageHandler" [英] Error 'is a one-way 'MessageHandler' for spring-integration aggregator DSL

查看:52
本文介绍了错误“是弹簧集成聚合器 DSL 的单向“MessageHandler"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 DSL 测试一些带有 spring-integration 的东西.到目前为止,这只是一个测试,流程很简单:

I'm trying to test some stuff with spring-integration using the DSL. This is only a test so far, the flow is simple:

  • 创建一些消息
  • 并行处理(记录)它们
  • 聚合它们
  • 记录聚合

除了聚合器,它工作正常:

Apart from the aggregator, it is working fine:

@Bean
public IntegrationFlow integrationFlow() {
    return IntegrationFlows
            .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS)))
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .handle((GenericHandler<Integer>) (payload, headers) -> {
                System.out.println("\t delaying message:" + payload + " on thread "
                        + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    System.err.println(e.getMessage());
                }
                return payload;
            })
            .handle(this::logMessage)
            .aggregate(a ->
                    a.releaseStrategy(g -> g.size()>10)
                     .outputProcessor(g ->
                             g.getMessages()
                                     .stream()
                                     .map(e -> e.getPayload().toString())
                                     .collect(Collectors.joining(",")))

                     )
            .handle(this::logMessage)
            .get();

}

如果我省略了 .aggregate(..) 部分,则示例正在运行.

If I leave out the .aggregate(..), part, the sample is working.

使用聚合器,我得到以下异常:

Withe the aggregator, I get the following exception:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.faboo.test.ParallelIntegrationApplication$$Lambda$9/1341404543@6fe1b4fb) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

据我所知,它抱怨聚合器没有输出?

As far as I understand, it complains that there is no output from the aggregator?

完整的源代码可以在这里找到:hithub

The full source can be found here: hithub

推荐答案

问题在于聚合器之前的 handle() - 它没有产生结果,所以没有什么可聚合的......>

The problem is the handle() before the aggregator - it produces no result so there is nothing to aggregate...

        .handle(this::logMessage)
        .aggregate(a ->

大概 logMessage(Message) 有一个 void 返回类型.

Presumably logMessage(Message<?>) has a void return type.

如果你想在聚合器之前登录,使用wireTap,或者改变logMessage在登录后返回Message.

If you want to log before the aggregator, use a wireTap, or change logMessage to return the Message<?> after logging.

        .wireTap(sf -> sf.handle(this::logMessage))

这篇关于错误“是弹簧集成聚合器 DSL 的单向“MessageHandler"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆