等待所有线程在Spring Integration中完成 [英] Waiting for all threads to finish in Spring Integration

查看:342
本文介绍了等待所有线程在Spring Integration中完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个自我执行的jar程序,它在很大程度上依赖于Spring Integration。我遇到的问题是程序在其他Spring bean完成之前终止

I have a self-executable jar program that relies heavily on Spring Integration. The problem I am having is that the program is terminating before the other Spring beans have completely finished.

下面是一个简化版本的我正在使用的代码,如果需要,我可以提供更多的代码/配置。入口点是一个main()方法,它引导Spring并启动导入过程:

Below is a cut-down version of the code I'm using, I can supply more code/configuration if needed. The entry point is a main() method, which bootstraps Spring and starts the import process:

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

DataImporter包含一个简单的循环,可以将消息发送到a Spring集成网关。这为流提供了一种主动的推送方法,而不是轮询数据的常用方法。这就是我的问题所在:

The DataImporter contains a simple loop that fires messages to a Spring Integration gateway. This delivers an active "push" approach to the flow, rather than the common approach of polling for data. This is where my problem comes in:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

为了完整性,流XML看起来类似这样的事情:

For completeness, the flow XML looks something like this:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

流程开始并有效地处理项目,但是一旦startImport()循环结束,主线程终止并且立即拆除所有Spring Integration线程。这会导致竞争条件,当程序终止时,最后(n)个项目没有完全处理。

The flow starts and processes items effectively, but once the startImport() loop finishes the main thread terminates and tears down all the Spring Integration threads immediately. This results in a race condition, the last (n) items are not completely processed when the program terminates.

我知道保留项目的引用计数正在处理,但事实证明这非常复杂,因为流程经常将消息拆分/路由到多个服务激活器 - 这意味着很难确定每个项目是否已完成。

I have an idea of maintaining a reference count of the items I am processing, but this is proving to be quite complicated, since the flow often splits/routes the messages to multiple service activators - meaning it is difficult to determine if each item has "finished".

我认为我需要的是检查没有Spring bean仍在执行的方法,或者标记发送到网关的所有项目在终止之前已经完全处理。

What I think I need is some way to either check that no Spring beans are still executing, or to flag that all items sent to the gateway have been completely processed before terminating.

我的问题是,我怎么可能去做其中任何一项,或者是否有更好的方法解决我没想过的问题?

推荐答案

您这里没有使用请求 - 响应模式。

You're not using a request-response pattern here.

outbound-channel-adapter是一个即发即弃操作,如果你想等待响应你应该使用一个等待响应的出站网关,并将响应连接到原来的网关,然后在java sendAndReceive中不只是发布。

outbound-channel-adapter is a fire and forget action, if you want to wait for the response you should use an outbound-gateway that will wait for response, and connect the response to the original gateway, then in java sendAndReceive not just publish.

这篇关于等待所有线程在Spring Integration中完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆