错误“是弹簧集成聚合器 DSL 的单向“MessageHandler" [英] Error 'is a one-way 'MessageHandler' for spring-integration aggregator DSL
问题描述
我正在尝试使用 DSL 测试一些带有 spring-integration 的东西.到目前为止,这只是一个测试,流程很简单:
I'm trying to test some stuff with spring-integration using the DSL. This is only a test so far, the flow is simple:
- 创建一些消息
- 并行处理(记录)它们
- 聚合它们
- 记录聚合
除了聚合器,它工作正常:
Apart from the aggregator, it is working fine:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS)))
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle((GenericHandler<Integer>) (payload, headers) -> {
System.out.println("\t delaying message:" + payload + " on thread "
+ Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
return payload;
})
.handle(this::logMessage)
.aggregate(a ->
a.releaseStrategy(g -> g.size()>10)
.outputProcessor(g ->
g.getMessages()
.stream()
.map(e -> e.getPayload().toString())
.collect(Collectors.joining(",")))
)
.handle(this::logMessage)
.get();
}
如果我省略了 .aggregate(..) 部分,则示例正在运行.
If I leave out the .aggregate(..), part, the sample is working.
使用聚合器,我得到以下异常:
Withe the aggregator, I get the following exception:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.faboo.test.ParallelIntegrationApplication$$Lambda$9/1341404543@6fe1b4fb) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
据我所知,它抱怨聚合器没有输出?
As far as I understand, it complains that there is no output from the aggregator?
完整的源代码可以在这里找到:hithub
The full source can be found here: hithub
推荐答案
问题在于聚合器之前的 handle()
- 它没有产生结果,所以没有什么可聚合的......>
The problem is the handle()
before the aggregator - it produces no result so there is nothing to aggregate...
.handle(this::logMessage)
.aggregate(a ->
大概 logMessage(Message>)
有一个 void
返回类型.
Presumably logMessage(Message<?>)
has a void
return type.
如果你想在聚合器之前登录,使用wireTap
,或者改变logMessage
在登录后返回Message>
.
If you want to log before the aggregator, use a wireTap
, or change logMessage
to return the Message<?>
after logging.
.wireTap(sf -> sf.handle(this::logMessage))
这篇关于错误“是弹簧集成聚合器 DSL 的单向“MessageHandler"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!